rhea-promise
Advanced tools
Comparing version 1.2.0 to 1.2.1
@@ -0,1 +1,5 @@ | ||
### 1.2.1 - (2021-04-15) | ||
- `createSession`, `createReceiver`, and `createSender` methods now only close underlying rhea analogue when cancelled if the resource has already been opened. | ||
### 1.2.0 - 2021-03-25 | ||
@@ -2,0 +6,0 @@ - Exposes the `incoming` getter on the `Session` that lets accessing size and capacity of the incoming deliveries [#79](https://github.com/amqp/rhea-promise/pull/79). |
@@ -322,2 +322,38 @@ "use strict"; | ||
return new Promise((resolve, reject) => { | ||
const abortSignal = options && options.abortSignal; | ||
let onAbort; | ||
if (abortSignal) { | ||
const rejectOnAbort = () => { | ||
const err = utils_1.createAbortError(); | ||
log.error("[%s] [%s]", this.id, err.message); | ||
return reject(err); | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
if (rheaSession.is_open()) { | ||
// This scenario *shouldn't* be possible because if `is_open()` returns true, | ||
// our `onOpen` handler should have executed and removed this abort listener. | ||
// This is a 'just in case' check in case the operation was cancelled sometime | ||
// between when the session's state was updated and when the sessionOpen | ||
// event was handled. | ||
rheaSession.close(); | ||
} | ||
else if (!rheaSession.is_closed()) { | ||
// If the rheaSession isn't closed, then it's possible the peer will still | ||
// attempt to begin the session. | ||
// We can detect that if it occurs and close our session. | ||
rheaSession.once(rhea_1.SessionEvents.sessionOpen, () => { | ||
rheaSession.close(); | ||
}); | ||
} | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal.aborted) { | ||
// Exit early before we do any work. | ||
return rejectOnAbort(); | ||
} | ||
else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
const rheaSession = this._connection.create_session(); | ||
@@ -329,4 +365,2 @@ const session = new session_1.Session(this, rheaSession); | ||
let onDisconnected; | ||
let onAbort; | ||
const abortSignal = options && options.abortSignal; | ||
let waitTimer; | ||
@@ -361,9 +395,2 @@ const removeListeners = () => { | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
rheaSession.close(); | ||
const err = utils_1.createAbortError(); | ||
log.error("[%s] [%s]", this.id, err.message); | ||
return reject(err); | ||
}; | ||
const actionAfterTimeout = () => { | ||
@@ -382,10 +409,2 @@ removeListeners(); | ||
rheaSession.begin(); | ||
if (abortSignal) { | ||
if (abortSignal.aborted) { | ||
onAbort(); | ||
} | ||
else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
}); | ||
@@ -392,0 +411,0 @@ } |
@@ -234,2 +234,38 @@ "use strict"; | ||
} | ||
const abortSignal = options && options.abortSignal; | ||
let onAbort; | ||
if (abortSignal) { | ||
const rejectOnAbort = () => { | ||
const err = utils_1.createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
if (rheaReceiver.is_open()) { | ||
// This scenario *shouldn't* be possible because if `is_open()` returns true, | ||
// our `onOpen` handler should have executed and removed this abort listener. | ||
// This is a 'just in case' check in case the operation was cancelled sometime | ||
// between when the receiver's state was updated and when the receiverOpen | ||
// event was handled. | ||
rheaReceiver.close(); | ||
} | ||
else if (!rheaReceiver.is_closed()) { | ||
// If the rheaReceiver isn't closed, then it's possible the peer will still | ||
// attempt to attach the link and open our receiver. | ||
// We can detect that if it occurs and close our receiver. | ||
rheaReceiver.once(rhea_1.ReceiverEvents.receiverOpen, () => { | ||
rheaReceiver.close(); | ||
}); | ||
} | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal.aborted) { | ||
// Exit early before we do any work. | ||
return rejectOnAbort(); | ||
} | ||
else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
// Register session handlers for session_error and session_close if provided. | ||
@@ -254,4 +290,2 @@ // listeners provided by the user in the options object should be added | ||
let onDisconnected; | ||
let onAbort; | ||
const abortSignal = options && options.abortSignal; | ||
let waitTimer; | ||
@@ -304,9 +338,2 @@ if (options && options.onMessage) { | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
rheaReceiver.close(); | ||
const err = utils_1.createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
const actionAfterTimeout = () => { | ||
@@ -324,10 +351,2 @@ removeListeners(); | ||
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000); | ||
if (abortSignal) { | ||
if (abortSignal.aborted) { | ||
onAbort(); | ||
} | ||
else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
}); | ||
@@ -372,2 +391,38 @@ } | ||
return new Promise((resolve, reject) => { | ||
const abortSignal = options && options.abortSignal; | ||
let onAbort; | ||
if (abortSignal) { | ||
const rejectOnAbort = () => { | ||
const err = utils_1.createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
if (rheaSender.is_open()) { | ||
// This scenario *shouldn't* be possible because if `is_open()` returns true, | ||
// our `onOpen` handler should have executed and removed this abort listener. | ||
// This is a 'just in case' check in case the operation was cancelled sometime | ||
// between when the sender's state was updated and when the senderOpen | ||
// event was handled. | ||
rheaSender.close(); | ||
} | ||
else if (!rheaSender.is_closed()) { | ||
// If the rheaSender isn't closed, then it's possible the peer will still | ||
// attempt to attach the link and open our sender. | ||
// We can detect that if it occurs and close our sender. | ||
rheaSender.once(rhea_1.SenderEvents.senderOpen, () => { | ||
rheaSender.close(); | ||
}); | ||
} | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal.aborted) { | ||
// Exit early before we do any work. | ||
return rejectOnAbort(); | ||
} | ||
else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
// Register session handlers for session_error and session_close if provided. | ||
@@ -396,4 +451,2 @@ if (options && options.onSessionError) { | ||
let onDisconnected; | ||
let onAbort; | ||
const abortSignal = options && options.abortSignal; | ||
let waitTimer; | ||
@@ -454,9 +507,2 @@ // listeners provided by the user in the options object should be added | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
rheaSender.close(); | ||
const err = utils_1.createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
const actionAfterTimeout = () => { | ||
@@ -474,10 +520,2 @@ removeListeners(); | ||
waitTimer = setTimeout(actionAfterTimeout, this.connection.options.operationTimeoutInSeconds * 1000); | ||
if (abortSignal) { | ||
if (abortSignal.aborted) { | ||
onAbort(); | ||
} | ||
else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
}); | ||
@@ -484,0 +522,0 @@ } |
@@ -536,2 +536,39 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
return new Promise((resolve, reject) => { | ||
const abortSignal = options && options.abortSignal; | ||
let onAbort: Func<void, void>; | ||
if (abortSignal) { | ||
const rejectOnAbort = () => { | ||
const err = createAbortError(); | ||
log.error("[%s] [%s]", this.id, err.message); | ||
return reject(err); | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
if (rheaSession.is_open()) { | ||
// This scenario *shouldn't* be possible because if `is_open()` returns true, | ||
// our `onOpen` handler should have executed and removed this abort listener. | ||
// This is a 'just in case' check in case the operation was cancelled sometime | ||
// between when the session's state was updated and when the sessionOpen | ||
// event was handled. | ||
rheaSession.close(); | ||
} else if (!rheaSession.is_closed()) { | ||
// If the rheaSession isn't closed, then it's possible the peer will still | ||
// attempt to begin the session. | ||
// We can detect that if it occurs and close our session. | ||
rheaSession.once(SessionEvents.sessionOpen, () => { | ||
rheaSession.close(); | ||
}); | ||
} | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal.aborted) { | ||
// Exit early before we do any work. | ||
return rejectOnAbort(); | ||
} else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
const rheaSession = this._connection.create_session(); | ||
@@ -543,4 +580,2 @@ const session = new Session(this, rheaSession); | ||
let onDisconnected: Func<RheaEventContext, void>; | ||
let onAbort: Func<void, void>; | ||
const abortSignal = options && options.abortSignal; | ||
let waitTimer: any; | ||
@@ -580,10 +615,2 @@ | ||
onAbort = () => { | ||
removeListeners(); | ||
rheaSession.close(); | ||
const err = createAbortError(); | ||
log.error("[%s] [%s]", this.id, err.message); | ||
return reject(err); | ||
}; | ||
const actionAfterTimeout = () => { | ||
@@ -603,10 +630,2 @@ removeListeners(); | ||
rheaSession.begin(); | ||
if (abortSignal) { | ||
if (abortSignal.aborted) { | ||
onAbort(); | ||
} else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
}); | ||
@@ -613,0 +632,0 @@ } |
@@ -278,2 +278,39 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
const abortSignal = options && options.abortSignal; | ||
let onAbort: Func<void, void>; | ||
if (abortSignal) { | ||
const rejectOnAbort = () => { | ||
const err = createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
if (rheaReceiver.is_open()) { | ||
// This scenario *shouldn't* be possible because if `is_open()` returns true, | ||
// our `onOpen` handler should have executed and removed this abort listener. | ||
// This is a 'just in case' check in case the operation was cancelled sometime | ||
// between when the receiver's state was updated and when the receiverOpen | ||
// event was handled. | ||
rheaReceiver.close(); | ||
} else if (!rheaReceiver.is_closed()) { | ||
// If the rheaReceiver isn't closed, then it's possible the peer will still | ||
// attempt to attach the link and open our receiver. | ||
// We can detect that if it occurs and close our receiver. | ||
rheaReceiver.once(ReceiverEvents.receiverOpen, () => { | ||
rheaReceiver.close(); | ||
}); | ||
} | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal.aborted) { | ||
// Exit early before we do any work. | ||
return rejectOnAbort(); | ||
} else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
// Register session handlers for session_error and session_close if provided. | ||
@@ -299,4 +336,2 @@ // listeners provided by the user in the options object should be added | ||
let onDisconnected: Func<RheaEventContext, void>; | ||
let onAbort: Func<void, void>; | ||
const abortSignal = options && options.abortSignal; | ||
let waitTimer: any; | ||
@@ -361,10 +396,2 @@ | ||
onAbort = () => { | ||
removeListeners(); | ||
rheaReceiver.close(); | ||
const err = createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
const actionAfterTimeout = () => { | ||
@@ -383,10 +410,2 @@ removeListeners(); | ||
waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); | ||
if (abortSignal) { | ||
if (abortSignal.aborted) { | ||
onAbort(); | ||
} else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
}); | ||
@@ -436,2 +455,39 @@ } | ||
return new Promise((resolve, reject) => { | ||
const abortSignal = options && options.abortSignal; | ||
let onAbort: Func<void, void>; | ||
if (abortSignal) { | ||
const rejectOnAbort = () => { | ||
const err = createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
onAbort = () => { | ||
removeListeners(); | ||
if (rheaSender.is_open()) { | ||
// This scenario *shouldn't* be possible because if `is_open()` returns true, | ||
// our `onOpen` handler should have executed and removed this abort listener. | ||
// This is a 'just in case' check in case the operation was cancelled sometime | ||
// between when the sender's state was updated and when the senderOpen | ||
// event was handled. | ||
rheaSender.close(); | ||
} else if (!rheaSender.is_closed()) { | ||
// If the rheaSender isn't closed, then it's possible the peer will still | ||
// attempt to attach the link and open our sender. | ||
// We can detect that if it occurs and close our sender. | ||
rheaSender.once(SenderEvents.senderOpen, () => { | ||
rheaSender.close(); | ||
}); | ||
} | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal.aborted) { | ||
// Exit early before we do any work. | ||
return rejectOnAbort(); | ||
} else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
// Register session handlers for session_error and session_close if provided. | ||
@@ -461,4 +517,2 @@ if (options && options.onSessionError) { | ||
let onDisconnected: Func<RheaEventContext, void>; | ||
let onAbort: Func<void, void>; | ||
const abortSignal = options && options.abortSignal; | ||
let waitTimer: any; | ||
@@ -525,10 +579,2 @@ | ||
onAbort = () => { | ||
removeListeners(); | ||
rheaSender.close(); | ||
const err = createAbortError(); | ||
log.error("[%s] [%s]", this.connection.id, err.message); | ||
return reject(err); | ||
}; | ||
const actionAfterTimeout = () => { | ||
@@ -547,10 +593,2 @@ removeListeners(); | ||
waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); | ||
if (abortSignal) { | ||
if (abortSignal.aborted) { | ||
onAbort(); | ||
} else { | ||
abortSignal.addEventListener("abort", onAbort); | ||
} | ||
} | ||
}); | ||
@@ -557,0 +595,0 @@ } |
{ | ||
"name": "rhea-promise", | ||
"version": "1.2.0", | ||
"version": "1.2.1", | ||
"description": "A Promisified layer over rhea AMQP client", | ||
@@ -5,0 +5,0 @@ "license": "Apache-2.0", |
@@ -454,2 +454,4 @@ import * as rhea from "rhea"; | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
const sessionMap = (connection["_connection"] as any)["local_channel_map"]; | ||
assert.deepEqual(sessionMap, {}); | ||
await connection.close(); | ||
@@ -480,2 +482,13 @@ }); | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
const sessionMap = (connection["_connection"] as any)["local_channel_map"]; | ||
// There should be at most 1 session. | ||
const [sessionName] = Object.keys(sessionMap); | ||
const session = sessionName && sessionMap[sessionName]; | ||
if (!session.is_closed()) { | ||
await new Promise(resolve => { | ||
session.once(rhea.SessionEvents.sessionClose, resolve); | ||
}); | ||
} | ||
assert.isTrue(session.is_closed(), "Session should be closed."); | ||
assert.deepEqual(sessionMap, {}); | ||
await connection.close(); | ||
@@ -482,0 +495,0 @@ }); |
@@ -276,2 +276,3 @@ import * as rhea from "rhea"; | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
assert.isUndefined(extractLink(session), "Expected the session to not have any links.") | ||
await connection.close(); | ||
@@ -295,2 +296,4 @@ }); | ||
const link = extractLink(session)!; | ||
let abortErrorThrown = false; | ||
@@ -304,2 +307,12 @@ try { | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
assert.isFalse(link.is_open(), "Link should not be open."); | ||
// Cancelling link creation should guarantee that the underlying | ||
// link is closed and removed from the session. | ||
if (!link.is_closed()) { | ||
await new Promise(resolve => { | ||
link.once(rhea.SenderEvents.senderClose, resolve); | ||
}); | ||
} | ||
assert.isTrue(link.is_closed(), "Link should be closed."); | ||
assert.isUndefined(extractLink(session), "Expected the session to not have any links.") | ||
await connection.close(); | ||
@@ -331,2 +344,3 @@ }); | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
assert.isUndefined(extractLink(session), "Expected the session to not have any links.") | ||
await connection.close(); | ||
@@ -350,2 +364,4 @@ }); | ||
const link = extractLink(session)!; | ||
let abortErrorThrown = false; | ||
@@ -359,2 +375,12 @@ try { | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
assert.isFalse(link.is_open(), "Link should not be open."); | ||
// Cancelling link creation should guarantee that the underlying | ||
// link is closed and removed from the session. | ||
if (!link.is_closed()) { | ||
await new Promise(resolve => { | ||
link.once(rhea.SenderEvents.senderClose, resolve); | ||
}); | ||
} | ||
assert.isTrue(link.is_closed(), "Link should be closed."); | ||
assert.isUndefined(extractLink(session), "Expected the session to not have any links.") | ||
await connection.close(); | ||
@@ -386,2 +412,3 @@ }); | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
assert.isUndefined(extractLink(session), "Expected the session to not have any links.") | ||
await connection.close(); | ||
@@ -404,2 +431,4 @@ }); | ||
abortController.abort(); | ||
const link = extractLink(session)!; | ||
@@ -414,5 +443,20 @@ let abortErrorThrown = false; | ||
assert.isTrue(abortErrorThrown, "AbortError should have been thrown."); | ||
assert.isFalse(link.is_open(), "Link should not be open."); | ||
// Cancelling link creation should guarantee that the underlying | ||
// link is closed and removed from the session. | ||
if (!link.is_closed()) { | ||
await new Promise(resolve => { | ||
link.once(rhea.ReceiverEvents.receiverClose, resolve); | ||
}); | ||
} | ||
assert.isTrue(link.is_closed(), "Link should be closed."); | ||
assert.isUndefined(extractLink(session), "Expected the session to not have any links.") | ||
await connection.close(); | ||
}); | ||
}); | ||
function extractLink(session: Session) { | ||
return session["_session"].find_link(() => true); | ||
} | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
432009
7501