@azure/core-amqp
Advanced tools
Comparing version 1.0.0-preview.1 to 1.0.0-preview.2
@@ -1,3 +0,15 @@ | ||
## 1.0.0-preview.1.0 - 28th June, 2019 | ||
## 1.0.0-preview.2 - 5th August, 2019 | ||
- Retry updates | ||
- The properties on the `RetryConfig` interface have been refactored for ease of use. The new `RetryOptions` in it will hold configurations like the number of retries, delay between retries, per try timeout etc. | ||
- Support for exponential retry has been added | ||
- Support for cancellation has been added via an optional `AbortSignal` from the [@azure/abort-controller](https://www.npmjs.com/package/@azure/abort-controller) package. | ||
- The `RequestResponseLink` class has been updated to not have retries anymore for the `sendRequest()` method. The caller of this method is expected to add the relevant retries. | ||
- All time related entites have been updated to use milli seconds as the unit of time for consistency. | ||
- New error `InsufficientCreditError` is introduced for the scenario where [rhea](https://www.npmjs.com/package/rhea) is unable to send events due to its internal buffer being full. This is a transient error and so is treated as retryable. | ||
- The error `OperationTimeoutError` was previously mistakenly classified as an AMQP error which is now corrected. Since this can also be a transient error, it is treated as retryable. | ||
## 1.0.0-preview.1 - 28th June, 2019 | ||
This library is based off of the [@azure/amqp-common](https://www.npmjs.com/package/@azure/amqp-common) | ||
@@ -4,0 +16,0 @@ library. Both are meant to contain common functionality required by Azure Javascript libraries that |
@@ -41,3 +41,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
}, | ||
operationTimeoutInSeconds: parameters.operationTimeoutInSeconds | ||
operationTimeoutInSeconds: parameters.operationTimeoutInMs | ||
? parameters.operationTimeoutInMs / 1000 | ||
: undefined | ||
}; | ||
@@ -44,0 +46,0 @@ if (parameters.config.webSocket || |
@@ -128,6 +128,2 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* Error is thrown when timeout happens for the said operation. | ||
*/ | ||
ConditionErrorNameMapper["amqp:operation-timeout"] = "OperationTimeoutError"; | ||
/** | ||
* Error is thrown when an argument has a value that is out of the admissible range. | ||
@@ -335,6 +331,2 @@ */ | ||
/** | ||
* Error is thrown when timeout happens for the said operation. | ||
*/ | ||
ErrorNameConditionMapper["OperationTimeoutError"] = "amqp:operation-timeout"; | ||
/** | ||
* Error is thrown when an argument has a value that is out of the admissible range. | ||
@@ -483,2 +475,5 @@ */ | ||
"OperationCancelledError", | ||
// OperationTimeoutError occurs when the service fails to respond within a given timeframe. | ||
// Since reasons for such failures can be transient, this is treated as a retryable error. | ||
"OperationTimeoutError", | ||
"SenderBusyError", | ||
@@ -488,3 +483,6 @@ "MessagingError", | ||
"ConnectionForcedError", | ||
"TransferLimitExceededError" | ||
"TransferLimitExceededError", | ||
// InsufficientCreditError occurs when the number of credits available on Rhea link is insufficient. | ||
// Since reasons for such shortage can be transient such as for pending delivery of messages, this is treated as a retryable error. | ||
"InsufficientCreditError" | ||
]; | ||
@@ -549,8 +547,3 @@ /** | ||
// with user input and not an issue with the Messaging process. | ||
if (err instanceof TypeError || | ||
err instanceof RangeError || | ||
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error. | ||
// Must do a name check until AbortError is updated, and that doesn't break compatibility | ||
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work | ||
err.name === "AbortError") { | ||
if (err instanceof TypeError || err instanceof RangeError) { | ||
error.retryable = false; | ||
@@ -582,4 +575,5 @@ return error; | ||
} | ||
return error; | ||
} | ||
else if (isSystemError(err)) { | ||
if (isSystemError(err)) { | ||
// translate | ||
@@ -601,4 +595,5 @@ const condition = err.code; | ||
} | ||
return error; | ||
} | ||
else if (isBrowserWebsocketError(err)) { | ||
if (isBrowserWebsocketError(err)) { | ||
// Translate browser communication errors during opening handshake to generic SeviceCommunicationError | ||
@@ -608,10 +603,21 @@ error = new MessagingError("Websocket connection failed."); | ||
error.retryable = false; | ||
return error; | ||
} | ||
else { | ||
// Translate a generic error into MessagingError. | ||
error = new MessagingError(err.message); | ||
error.stack = err.stack; | ||
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error. | ||
// Must do a name check until the custom error is updated, and that doesn't break compatibility | ||
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work | ||
const errorName = err.name; | ||
if (retryableErrors.indexOf(errorName) > -1) { | ||
error.retryable = true; | ||
return error; | ||
} | ||
if (errorName === "AbortError") { | ||
error.retryable = false; | ||
return error; | ||
} | ||
// Translate a generic error into MessagingError. | ||
error = new MessagingError(err.message); | ||
error.stack = err.stack; | ||
return error; | ||
} | ||
//# sourceMappingURL=errors.js.map |
@@ -5,6 +5,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export { RequestResponseLink } from "./requestResponseLink"; | ||
export { retry, RetryOperationType } from "./retry"; | ||
export { retry, RetryOperationType, RetryMode } from "./retry"; | ||
export { DefaultDataTransformer } from "./dataTransformer"; | ||
export { TokenType } from "./auth/token"; | ||
export { isTokenCredential } from "@azure/core-http"; | ||
export { isTokenCredential } from "@azure/core-auth"; | ||
export { SharedKeyCredential } from "./auth/sas"; | ||
@@ -11,0 +11,0 @@ export { IotSharedKeyCredential } from "./auth/iotSas"; |
@@ -6,4 +6,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import * as Constants from "./util/constants"; | ||
import { retry, RetryOperationType } from "./retry"; | ||
import { ReceiverEvents, generate_uuid } from "rhea-promise"; | ||
import { ReceiverEvents } from "rhea-promise"; | ||
import { translate, ConditionStatusMapper } from "./errors"; | ||
@@ -46,5 +45,3 @@ import * as log from "./log"; | ||
* Sends the given request message and returns the received response. If the operation is not | ||
* completed in the provided timeout in seconds `default: 10`, then the request will be retried | ||
* linearly for the provided number of times `default: 3` with the provided delay in seconds | ||
* `default: 15` between each attempt. | ||
* completed in the provided timeout in milliseconds `default: 60000`, then `OperationTimeoutError` is thrown. | ||
* | ||
@@ -55,22 +52,10 @@ * @param {Message} request The AMQP (request) message. | ||
*/ | ||
sendRequest(request, options) { | ||
if (!options) | ||
options = {}; | ||
if (!options.timeoutInSeconds) { | ||
options.timeoutInSeconds = 10; | ||
sendRequest(request, options = {}) { | ||
if (!options.timeoutInMs) { | ||
options.timeoutInMs = Constants.defaultOperationTimeoutInMs; | ||
} | ||
let count = 0; | ||
const aborter = options && options.abortSignal; | ||
const sendRequestPromise = () => new Promise((resolve, reject) => { | ||
const aborter = options.abortSignal; | ||
return new Promise((resolve, reject) => { | ||
let waitTimer; | ||
let timeOver = false; | ||
count++; | ||
if (count !== 1) { | ||
// Generate a new message_id every time after the first attempt | ||
request.message_id = generate_uuid(); | ||
} | ||
else if (!request.message_id) { | ||
// Set the message_id in the first attempt only if it is not set | ||
request.message_id = generate_uuid(); | ||
} | ||
const rejectOnAbort = () => { | ||
@@ -138,5 +123,3 @@ const address = this.receiver.address || "address"; | ||
else { | ||
const condition = info.errorCondition || | ||
ConditionStatusMapper[info.statusCode] || | ||
"amqp:internal-error"; | ||
const condition = info.errorCondition || ConditionStatusMapper[info.statusCode] || "amqp:internal-error"; | ||
const e = { | ||
@@ -161,22 +144,12 @@ condition: condition, | ||
const e = { | ||
condition: ConditionStatusMapper[408], | ||
description: desc | ||
name: "OperationTimeoutError", | ||
message: desc | ||
}; | ||
return reject(translate(e)); | ||
}; | ||
waitTimer = setTimeout(actionAfterTimeout, options.timeoutInMs); | ||
this.receiver.on(ReceiverEvents.message, messageCallback); | ||
waitTimer = setTimeout(actionAfterTimeout, options.timeoutInSeconds * 1000); | ||
log.reqres("[%s] %s request sent: %O", this.connection.id, request.to || "$managment", request); | ||
this.sender.send(request); | ||
}); | ||
const config = { | ||
operation: sendRequestPromise, | ||
connectionId: this.connection.id, | ||
operationType: request.to && request.to === Constants.cbsEndpoint | ||
? RetryOperationType.cbsAuth | ||
: RetryOperationType.management, | ||
delayInSeconds: options.delayInSeconds, | ||
times: options.times | ||
}; | ||
return retry(config); | ||
} | ||
@@ -183,0 +156,0 @@ /** |
@@ -7,3 +7,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import * as log from "./log"; | ||
import { defaultRetryAttempts, defaultDelayBetweenRetriesInSeconds } from "./util/constants"; | ||
import { defaultMaxRetries, defaultDelayBetweenOperationRetriesInMs, defaultMaxDelayForExponentialRetryInMs } from "./util/constants"; | ||
import { resolve } from "dns"; | ||
@@ -26,2 +26,11 @@ /** | ||
/** | ||
* Describes the Retry Mode type | ||
* @enum RetryMode | ||
*/ | ||
export var RetryMode; | ||
(function (RetryMode) { | ||
RetryMode[RetryMode["Exponential"] = 0] = "Exponential"; | ||
RetryMode[RetryMode["Fixed"] = 1] = "Fixed"; | ||
})(RetryMode || (RetryMode = {})); | ||
/** | ||
* Describes the retry operation type. | ||
@@ -76,7 +85,14 @@ * @enum RetryOperationType | ||
/** | ||
* It will attempt to linearly retry an operation specified number of times with a specified | ||
* delay in between each retry. The retries will only happen if the error is retryable. | ||
* Every operation is attempted at least once. Additional attempts are made if the previous attempt failed | ||
* with a retryable error. The number of additional attempts is governed by the `maxRetries` property provided | ||
* on the `RetryConfig` argument. | ||
* | ||
* @param {RetryConfig<T>} config Parameters to configure retry operation. | ||
* If `mode` option is set to `Fixed`, then the retries are made on the | ||
* given operation for a specified number of times, with a fixed delay in between each retry each time. | ||
* | ||
* If `mode` option is set to `Exponential`, then the delay between retries is adjusted to increase | ||
* exponentially with each attempt using back-off factor of power 2. | ||
* | ||
* @param {RetryConfig<T>} config Parameters to configure retry operation | ||
* | ||
* @return {Promise<T>} Promise<T>. | ||
@@ -87,17 +103,28 @@ */ | ||
validateRetryConfig(config); | ||
if (config.times == undefined) | ||
config.times = defaultRetryAttempts; | ||
if (config.delayInSeconds == undefined) { | ||
config.delayInSeconds = defaultDelayBetweenRetriesInSeconds; | ||
if (!config.retryOptions) { | ||
config.retryOptions = {}; | ||
} | ||
if (config.retryOptions.maxRetries == undefined || config.retryOptions.maxRetries < 0) { | ||
config.retryOptions.maxRetries = defaultMaxRetries; | ||
} | ||
if (config.retryOptions.retryDelayInMs == undefined || config.retryOptions.retryDelayInMs < 0) { | ||
config.retryOptions.retryDelayInMs = defaultDelayBetweenOperationRetriesInMs; | ||
} | ||
if (config.retryOptions.maxRetryDelayInMs == undefined || | ||
config.retryOptions.maxRetryDelayInMs < 0) { | ||
config.retryOptions.maxRetryDelayInMs = defaultMaxDelayForExponentialRetryInMs; | ||
} | ||
if (config.retryOptions.mode == undefined) { | ||
config.retryOptions.mode = RetryMode.Fixed; | ||
} | ||
let lastError; | ||
let result; | ||
let success = false; | ||
for (let i = 0; i < config.times; i++) { | ||
const j = i + 1; | ||
log.retry("[%s] Retry for '%s', attempt number: %d", config.connectionId, config.operationType, j); | ||
const totalNumberOfAttempts = config.retryOptions.maxRetries + 1; | ||
for (let i = 1; i <= totalNumberOfAttempts; i++) { | ||
log.retry("[%s] Attempt number: %d", config.connectionId, config.operationType, i); | ||
try { | ||
result = yield config.operation(); | ||
success = true; | ||
log.retry("[%s] Success for '%s', after attempt number: %d.", config.connectionId, config.operationType, j); | ||
log.retry("[%s] Success for '%s', after attempt number: %d.", config.connectionId, config.operationType, i); | ||
if (result && !isDelivery(result)) { | ||
@@ -120,6 +147,15 @@ log.retry("[%s] Success result for '%s': %O", config.connectionId, config.operationType, result); | ||
lastError = err; | ||
log.error("[%s] Error occured for '%s' in attempt number %d: %O", config.connectionId, config.operationType, j, err); | ||
log.error("[%s] Error occured for '%s' in attempt number %d: %O", config.connectionId, config.operationType, i, err); | ||
let targetDelayInMs = config.retryOptions.retryDelayInMs; | ||
if (config.retryOptions.mode === RetryMode.Exponential) { | ||
let incrementDelta = Math.pow(2, i) - 1; | ||
const boundedRandDelta = config.retryOptions.retryDelayInMs * 0.8 + | ||
Math.floor(Math.random() * | ||
(config.retryOptions.retryDelayInMs * 1.2 - config.retryOptions.retryDelayInMs * 0.8)); | ||
incrementDelta *= boundedRandDelta; | ||
targetDelayInMs = Math.min(incrementDelta, config.retryOptions.maxRetryDelayInMs); | ||
} | ||
if (lastError && lastError.retryable) { | ||
log.error("[%s] Sleeping for %d seconds for '%s'.", config.connectionId, config.delayInSeconds, config.operationType); | ||
yield delay(config.delayInSeconds * 1000); | ||
log.error("[%s] Sleeping for %d milliseconds for '%s'.", config.connectionId, targetDelayInMs, config.operationType); | ||
yield delay(targetDelayInMs, config.abortSignal, `The retry operation has been cancelled by the user.`); | ||
continue; | ||
@@ -126,0 +162,0 @@ } |
@@ -52,3 +52,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export const connectionError = "connection_error"; | ||
export const defaultOperationTimeoutInSeconds = 60; | ||
export const defaultOperationTimeoutInMs = 60000; | ||
export const managementRequestKey = "managementRequest"; | ||
@@ -74,8 +74,8 @@ export const negotiateCbsKey = "negotiateCbs"; | ||
export const maxAbsoluteExpiryTime = new Date("9999-12-31T07:59:59.000Z").getTime(); | ||
export const aadTokenValidityMarginSeconds = 5; | ||
export const aadTokenValidityMarginInMs = 5000; | ||
export const connectionReconnectDelay = 300; | ||
export const defaultRetryAttempts = 3; | ||
export const defaultConnectionRetryAttempts = 150; | ||
export const defaultDelayBetweenOperationRetriesInSeconds = 5; | ||
export const defaultDelayBetweenRetriesInSeconds = 15; | ||
export const defaultMaxRetries = 3; | ||
export const defaultMaxRetriesForConnection = 150; | ||
export const defaultDelayBetweenOperationRetriesInMs = 30000; | ||
export const defaultMaxDelayForExponentialRetryInMs = 90000; | ||
export const receiverSettleMode = "receiver-settle-mode"; | ||
@@ -82,0 +82,0 @@ export const dispositionStatus = "disposition-status"; |
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
import AsyncLock from "async-lock"; | ||
import { AbortError } from "@azure/abort-controller"; | ||
export { AsyncLock }; | ||
@@ -100,8 +101,34 @@ /** | ||
* A wrapper for setTimeout that resolves a promise after t milliseconds. | ||
* @param {number} t - The number of milliseconds to be delayed. | ||
* @param {number} delayInMs - The number of milliseconds to be delayed. | ||
* @param {AbortSignalLike} abortSignal - The abortSignal associated with containing operation. | ||
* @param {string} abortErrorMsg - The abort error message associated with containing operation. | ||
* @param {T} value - The value to be resolved with after a timeout of t milliseconds. | ||
* @returns {Promise<T>} - Resolved promise | ||
*/ | ||
export function delay(t, value) { | ||
return new Promise((resolve) => setTimeout(() => resolve(value), t)); | ||
export function delay(delayInMs, abortSignal, abortErrorMsg, value) { | ||
return new Promise((resolve, reject) => { | ||
const rejectOnAbort = () => { | ||
return reject(new AbortError(abortErrorMsg ? abortErrorMsg : `The delay was cancelled by the user.`)); | ||
}; | ||
const removeListeners = () => { | ||
if (abortSignal) { | ||
abortSignal.removeEventListener("abort", onAborted); | ||
} | ||
}; | ||
const onAborted = () => { | ||
clearTimeout(timer); | ||
removeListeners(); | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal && abortSignal.aborted) { | ||
return rejectOnAbort(); | ||
} | ||
const timer = setTimeout(() => { | ||
removeListeners(); | ||
resolve(value); | ||
}, delayInMs); | ||
if (abortSignal) { | ||
abortSignal.addEventListener("abort", onAborted); | ||
} | ||
}); | ||
} | ||
@@ -108,0 +135,0 @@ /** |
{ | ||
"name": "@azure/core-amqp", | ||
"sdk-type": "client", | ||
"version": "1.0.0-preview.1", | ||
"version": "1.0.0-preview.2", | ||
"description": "Common library for amqp based azure sdks like @azure/event-hubs.", | ||
@@ -29,5 +29,5 @@ "author": "Microsoft Corporation", | ||
"build": "tsc -p . && rollup -c 2>&1", | ||
"check-format": "prettier --list-different --config .prettierrc.json \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", | ||
"check-format": "prettier --list-different --config ../../.prettierrc.json \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", | ||
"clean": "rimraf dist dist-esm typings temp browser/*.js* browser/*.zip statistics.html coverage coverage-browser test-browser .nyc_output *.tgz *.log test*.xml TEST*.xml", | ||
"format": "prettier --write --config .prettierrc.json \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", | ||
"format": "prettier --write --config ../../.prettierrc.json \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", | ||
"integration-test:browser": "echo skipped", | ||
@@ -57,3 +57,3 @@ "integration-test:node": "echo skipped", | ||
"@azure/abort-controller": "1.0.0-preview.1", | ||
"@azure/core-http": "^1.0.0-preview.1", | ||
"@azure/core-auth": "1.0.0-preview.2", | ||
"@types/async-lock": "^1.1.0", | ||
@@ -71,9 +71,9 @@ "@types/is-buffer": "^2.0.0", | ||
"url": "^0.11.0", | ||
"util": "^0.11.1" | ||
"util": "^0.12.1" | ||
}, | ||
"peerDependencies": { | ||
"rhea-promise": "^0.1.15" | ||
"rhea-promise": "^1.0.0" | ||
}, | ||
"devDependencies": { | ||
"@azure/identity": "^1.0.0-preview.1", | ||
"@azure/identity": "1.0.0-preview.2", | ||
"@types/chai": "^4.1.6", | ||
@@ -86,5 +86,5 @@ "@types/chai-as-promised": "^7.1.0", | ||
"@types/node": "^8.0.0", | ||
"@types/sinon": "^5.0.5", | ||
"@typescript-eslint/eslint-plugin": "~1.9.0", | ||
"@typescript-eslint/parser": "^1.7.0", | ||
"@types/sinon": "^7.0.13", | ||
"@typescript-eslint/eslint-plugin": "^1.11.0", | ||
"@typescript-eslint/parser": "^1.11.0", | ||
"assert": "^1.4.1", | ||
@@ -94,5 +94,5 @@ "chai": "^4.2.0", | ||
"cross-env": "^5.2.0", | ||
"dotenv": "^7.0.0", | ||
"dotenv": "^8.0.0", | ||
"eslint": "^5.16.0", | ||
"eslint-config-prettier": "^4.2.0", | ||
"eslint-config-prettier": "^6.0.0", | ||
"eslint-detailed-reporter": "^0.8.0", | ||
@@ -103,3 +103,3 @@ "eslint-plugin-no-null": "^1.0.2", | ||
"karma": "^4.0.1", | ||
"karma-chrome-launcher": "^2.2.0", | ||
"karma-chrome-launcher": "^3.0.0", | ||
"karma-mocha": "^1.3.0", | ||
@@ -113,8 +113,8 @@ "mocha": "^5.2.0", | ||
"rhea": "^1.0.4", | ||
"rhea-promise": "^0.1.15", | ||
"rhea-promise": "^1.0.0", | ||
"rimraf": "^2.6.2", | ||
"rollup": "~1.13.1", | ||
"rollup": "^1.16.3", | ||
"rollup-plugin-commonjs": "^10.0.0", | ||
"rollup-plugin-inject": "^2.2.0", | ||
"rollup-plugin-json": "^3.1.0", | ||
"rollup-plugin-inject": "^3.0.0", | ||
"rollup-plugin-json": "^4.0.0", | ||
"rollup-plugin-multi-entry": "^2.1.0", | ||
@@ -126,9 +126,8 @@ "rollup-plugin-node-globals": "^1.4.0", | ||
"rollup-plugin-sourcemaps": "^0.4.2", | ||
"rollup-plugin-uglify": "^6.0.0", | ||
"rollup-plugin-terser": "^5.1.1", | ||
"sinon": "^7.1.0", | ||
"ts-node": "^7.0.1", | ||
"tslint": "^5.15.0", | ||
"ts-node": "^8.3.0", | ||
"typescript": "^3.2.2", | ||
"ws": "^6.2.1" | ||
"ws": "^7.1.1" | ||
} | ||
} |
@@ -233,4 +233,6 @@ # Azure Core AMQP client library for AMQP operations | ||
If you'd like to contribute to this library, please read the [contributing guide](../../../CONTRIBUTING.md) to learn more about how to build and test the code. | ||
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). | ||
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or | ||
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. |
@@ -5,3 +5,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { SharedKeyCredential } from "./sas"; | ||
import { AccessToken } from "@azure/core-http"; | ||
import { AccessToken } from "@azure/core-auth"; | ||
import { Buffer } from "buffer"; | ||
@@ -8,0 +8,0 @@ |
@@ -5,3 +5,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { parseConnectionString, ServiceBusConnectionStringModel } from "../util/utils"; | ||
import { AccessToken } from "@azure/core-http"; | ||
import { AccessToken } from "@azure/core-auth"; | ||
import { Buffer } from "buffer"; | ||
@@ -8,0 +8,0 @@ import isBuffer from "is-buffer"; |
@@ -5,3 +5,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { TokenType } from "./auth/token"; | ||
import { AccessToken } from "@azure/core-http"; | ||
import { AccessToken } from "@azure/core-auth"; | ||
import { | ||
@@ -8,0 +8,0 @@ EventContext, |
@@ -7,3 +7,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { DataTransformer, DefaultDataTransformer } from "./dataTransformer"; | ||
import { TokenCredential } from "@azure/core-http"; | ||
import { TokenCredential } from "@azure/core-auth"; | ||
import { ConnectionConfig } from "./connectionConfig/connectionConfig"; | ||
@@ -121,7 +121,7 @@ import { SharedKeyCredential } from "./auth/sas"; | ||
/** | ||
* @property {number} [operationTimeoutInSeconds] - The duration in which the promise should | ||
* @property {number} [operationTimeoutInMs] - The duration in which the promise should | ||
* complete (resolve/reject). If it is not completed, then the Promise will be rejected after | ||
* timeout occurs. Default: `60 seconds`. | ||
* timeout occurs. Default: `60000 milliseconds`. | ||
*/ | ||
operationTimeoutInSeconds?: number; | ||
operationTimeoutInMs?: number; | ||
} | ||
@@ -161,3 +161,5 @@ | ||
}, | ||
operationTimeoutInSeconds: parameters.operationTimeoutInSeconds | ||
operationTimeoutInSeconds: parameters.operationTimeoutInMs | ||
? parameters.operationTimeoutInMs / 1000 | ||
: undefined | ||
}; | ||
@@ -164,0 +166,0 @@ |
@@ -129,6 +129,2 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* Error is thrown when timeout happens for the said operation. | ||
*/ | ||
"amqp:operation-timeout" = "OperationTimeoutError", | ||
/** | ||
* Error is thrown when an argument has a value that is out of the admissible range. | ||
@@ -336,6 +332,2 @@ */ | ||
/** | ||
* Error is thrown when timeout happens for the said operation. | ||
*/ | ||
OperationTimeoutError = "amqp:operation-timeout", | ||
/** | ||
* Error is thrown when an argument has a value that is out of the admissible range. | ||
@@ -494,2 +486,7 @@ */ | ||
"OperationCancelledError", | ||
// OperationTimeoutError occurs when the service fails to respond within a given timeframe. | ||
// Since reasons for such failures can be transient, this is treated as a retryable error. | ||
"OperationTimeoutError", | ||
"SenderBusyError", | ||
@@ -499,3 +496,7 @@ "MessagingError", | ||
"ConnectionForcedError", | ||
"TransferLimitExceededError" | ||
"TransferLimitExceededError", | ||
// InsufficientCreditError occurs when the number of credits available on Rhea link is insufficient. | ||
// Since reasons for such shortage can be transient such as for pending delivery of messages, this is treated as a retryable error. | ||
"InsufficientCreditError" | ||
]; | ||
@@ -569,10 +570,3 @@ | ||
// with user input and not an issue with the Messaging process. | ||
if ( | ||
err instanceof TypeError || | ||
err instanceof RangeError || | ||
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error. | ||
// Must do a name check until AbortError is updated, and that doesn't break compatibility | ||
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work | ||
(err as Error).name === "AbortError" | ||
) { | ||
if (err instanceof TypeError || err instanceof RangeError) { | ||
error.retryable = false; | ||
@@ -605,3 +599,6 @@ return error; | ||
} | ||
} else if (isSystemError(err)) { | ||
return error; | ||
} | ||
if (isSystemError(err)) { | ||
// translate | ||
@@ -621,3 +618,6 @@ const condition = (err as any).code; | ||
} | ||
} else if (isBrowserWebsocketError(err)) { | ||
return error; | ||
} | ||
if (isBrowserWebsocketError(err)) { | ||
// Translate browser communication errors during opening handshake to generic SeviceCommunicationError | ||
@@ -627,8 +627,22 @@ error = new MessagingError("Websocket connection failed."); | ||
error.retryable = false; | ||
} else { | ||
// Translate a generic error into MessagingError. | ||
error = new MessagingError((err as Error).message); | ||
error.stack = (err as Error).stack; | ||
return error; | ||
} | ||
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error. | ||
// Must do a name check until the custom error is updated, and that doesn't break compatibility | ||
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work | ||
const errorName = (err as Error).name; | ||
if (retryableErrors.indexOf(errorName) > -1) { | ||
error.retryable = true; | ||
return error; | ||
} | ||
if (errorName === "AbortError") { | ||
error.retryable = false; | ||
return error; | ||
} | ||
// Translate a generic error into MessagingError. | ||
error = new MessagingError((err as Error).message); | ||
error.stack = (err as Error).stack; | ||
return error; | ||
} |
@@ -7,6 +7,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export { RequestResponseLink, SendRequestOptions } from "./requestResponseLink"; | ||
export { retry, RetryConfig, RetryOperationType } from "./retry"; | ||
export { retry, RetryOptions, RetryConfig, RetryOperationType, RetryMode } from "./retry"; | ||
export { DataTransformer, DefaultDataTransformer } from "./dataTransformer"; | ||
export { TokenType } from "./auth/token"; | ||
export { AccessToken, TokenCredential, isTokenCredential } from "@azure/core-http"; | ||
export { AccessToken, TokenCredential, isTokenCredential } from "@azure/core-auth"; | ||
export { SharedKeyCredential } from "./auth/sas"; | ||
@@ -13,0 +13,0 @@ export { IotSharedKeyCredential } from "./auth/iotSas"; |
@@ -6,3 +6,2 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import * as Constants from "./util/constants"; | ||
import { retry, RetryConfig, RetryOperationType } from "./retry"; | ||
import { | ||
@@ -19,4 +18,3 @@ Session, | ||
ReceiverEvents, | ||
ReqResLink, | ||
generate_uuid | ||
ReqResLink | ||
} from "rhea-promise"; | ||
@@ -36,17 +34,7 @@ import { translate, ConditionStatusMapper } from "./errors"; | ||
/** | ||
* @property {number} [timeoutInSeconds] Max time to wait for the operation to complete. | ||
* Default: `10 seconds`. | ||
* @property {number} [timeoutInMs] Max time to wait for the operation to complete. | ||
* Default: `60000 milliseconds`. | ||
*/ | ||
timeoutInSeconds?: number; | ||
timeoutInMs?: number; | ||
/** | ||
* @property {number} [times] Number of times the operation needs to be retried in case | ||
* of error. Default: 3. | ||
*/ | ||
times?: number; | ||
/** | ||
* @property {number} [delayInSeconds] Amount of time to wait in seconds before making the | ||
* next attempt. Default: 15. | ||
*/ | ||
delayInSeconds?: number; | ||
/** | ||
* @property {string} [requestName] Name of the request being performed. | ||
@@ -92,5 +80,3 @@ */ | ||
* Sends the given request message and returns the received response. If the operation is not | ||
* completed in the provided timeout in seconds `default: 10`, then the request will be retried | ||
* linearly for the provided number of times `default: 3` with the provided delay in seconds | ||
* `default: 15` between each attempt. | ||
* completed in the provided timeout in milliseconds `default: 60000`, then `OperationTimeoutError` is thrown. | ||
* | ||
@@ -101,168 +87,143 @@ * @param {Message} request The AMQP (request) message. | ||
*/ | ||
sendRequest(request: AmqpMessage, options?: SendRequestOptions): Promise<AmqpMessage> { | ||
if (!options) options = {}; | ||
if (!options.timeoutInSeconds) { | ||
options.timeoutInSeconds = 10; | ||
sendRequest(request: AmqpMessage, options: SendRequestOptions = {}): Promise<AmqpMessage> { | ||
if (!options.timeoutInMs) { | ||
options.timeoutInMs = Constants.defaultOperationTimeoutInMs; | ||
} | ||
let count: number = 0; | ||
const aborter: AbortSignalLike | undefined = options && options.abortSignal; | ||
const aborter: AbortSignalLike | undefined = options.abortSignal; | ||
const sendRequestPromise = () => | ||
new Promise<AmqpMessage>((resolve: any, reject: any) => { | ||
let waitTimer: any; | ||
let timeOver: boolean = false; | ||
type NormalizedInfo = { | ||
statusCode: number; | ||
statusDescription: string; | ||
errorCondition: string; | ||
}; | ||
return new Promise<AmqpMessage>((resolve: any, reject: any) => { | ||
let waitTimer: any; | ||
let timeOver: boolean = false; | ||
type NormalizedInfo = { | ||
statusCode: number; | ||
statusDescription: string; | ||
errorCondition: string; | ||
}; | ||
count++; | ||
if (count !== 1) { | ||
// Generate a new message_id every time after the first attempt | ||
request.message_id = generate_uuid(); | ||
} else if (!request.message_id) { | ||
// Set the message_id in the first attempt only if it is not set | ||
request.message_id = generate_uuid(); | ||
const rejectOnAbort = () => { | ||
const address = this.receiver.address || "address"; | ||
const requestName = options.requestName; | ||
const desc: string = | ||
`[${this.connection.id}] The request "${requestName}" ` + | ||
`to "${address}" has been cancelled by the user.`; | ||
log.error(desc); | ||
const error = new AbortError( | ||
`The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.` | ||
); | ||
reject(error); | ||
}; | ||
const onAbort = () => { | ||
// remove the event listener as this will be registered next time someone makes a request. | ||
this.receiver.removeListener(ReceiverEvents.message, messageCallback); | ||
// safe to clear the timeout if it hasn't already occurred. | ||
if (!timeOver) { | ||
clearTimeout(waitTimer); | ||
} | ||
aborter!.removeEventListener("abort", onAbort); | ||
const rejectOnAbort = () => { | ||
const address = this.receiver.address || "address"; | ||
const requestName = options!.requestName; | ||
const desc: string = | ||
`[${this.connection.id}] The request "${requestName}" ` + | ||
`to "${address}" has been cancelled by the user.`; | ||
log.error(desc); | ||
const error = new AbortError( | ||
`The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.` | ||
); | ||
rejectOnAbort(); | ||
}; | ||
reject(error); | ||
}; | ||
if (aborter) { | ||
// the aborter may have been triggered between request attempts | ||
// so check if it was triggered and reject if needed. | ||
if (aborter.aborted) { | ||
return rejectOnAbort(); | ||
} | ||
aborter.addEventListener("abort", onAbort); | ||
} | ||
const onAbort = () => { | ||
// remove the event listener as this will be registered next time someone makes a request. | ||
this.receiver.removeListener(ReceiverEvents.message, messageCallback); | ||
// safe to clear the timeout if it hasn't already occurred. | ||
if (!timeOver) { | ||
clearTimeout(waitTimer); | ||
} | ||
aborter!.removeEventListener("abort", onAbort); | ||
rejectOnAbort(); | ||
// Handle different variations of property names in responses emitted by EventHubs and ServiceBus. | ||
const getCodeDescriptionAndError = (props: any): NormalizedInfo => { | ||
if (!props) props = {}; | ||
return { | ||
statusCode: (props[Constants.statusCode] || props.statusCode) as number, | ||
statusDescription: (props[Constants.statusDescription] || | ||
props.statusDescription) as string, | ||
errorCondition: (props[Constants.errorCondition] || props.errorCondition) as string | ||
}; | ||
}; | ||
const messageCallback = (context: EventContext) => { | ||
// remove the event listeners as they will be registered next time when someone makes a request. | ||
this.receiver.removeListener(ReceiverEvents.message, messageCallback); | ||
if (aborter) { | ||
// the aborter may have been triggered between request attempts | ||
// so check if it was triggered and reject if needed. | ||
if (aborter.aborted) { | ||
return rejectOnAbort(); | ||
} | ||
aborter.addEventListener("abort", onAbort); | ||
aborter.removeEventListener("abort", onAbort); | ||
} | ||
// Handle different variations of property names in responses emitted by EventHubs and ServiceBus. | ||
const getCodeDescriptionAndError = (props: any): NormalizedInfo => { | ||
if (!props) props = {}; | ||
return { | ||
statusCode: (props[Constants.statusCode] || props.statusCode) as number, | ||
statusDescription: (props[Constants.statusDescription] || | ||
props.statusDescription) as string, | ||
errorCondition: (props[Constants.errorCondition] || props.errorCondition) as string | ||
}; | ||
}; | ||
const messageCallback = (context: EventContext) => { | ||
// remove the event listeners as they will be registered next time when someone makes a request. | ||
this.receiver.removeListener(ReceiverEvents.message, messageCallback); | ||
if (aborter) { | ||
aborter.removeEventListener("abort", onAbort); | ||
} | ||
const info = getCodeDescriptionAndError(context.message!.application_properties); | ||
const responseCorrelationId = context.message!.correlation_id; | ||
log.reqres( | ||
"[%s] %s response: ", | ||
this.connection.id, | ||
request.to || "$management", | ||
context.message | ||
); | ||
if (info.statusCode > 199 && info.statusCode < 300) { | ||
if ( | ||
request.message_id === responseCorrelationId || | ||
request.correlation_id === responseCorrelationId | ||
) { | ||
if (!timeOver) { | ||
clearTimeout(waitTimer); | ||
} | ||
log.reqres( | ||
"[%s] request-messageId | '%s' == '%s' | response-correlationId.", | ||
this.connection.id, | ||
request.message_id, | ||
responseCorrelationId | ||
); | ||
return resolve(context.message); | ||
} else { | ||
log.error( | ||
"[%s] request-messageId | '%s' != '%s' | response-correlationId. " + | ||
"Hence dropping this response and waiting for the next one.", | ||
this.connection.id, | ||
request.message_id, | ||
responseCorrelationId | ||
); | ||
const info = getCodeDescriptionAndError(context.message!.application_properties); | ||
const responseCorrelationId = context.message!.correlation_id; | ||
log.reqres( | ||
"[%s] %s response: ", | ||
this.connection.id, | ||
request.to || "$management", | ||
context.message | ||
); | ||
if (info.statusCode > 199 && info.statusCode < 300) { | ||
if ( | ||
request.message_id === responseCorrelationId || | ||
request.correlation_id === responseCorrelationId | ||
) { | ||
if (!timeOver) { | ||
clearTimeout(waitTimer); | ||
} | ||
log.reqres( | ||
"[%s] request-messageId | '%s' == '%s' | response-correlationId.", | ||
this.connection.id, | ||
request.message_id, | ||
responseCorrelationId | ||
); | ||
return resolve(context.message); | ||
} else { | ||
const condition = | ||
info.errorCondition || | ||
ConditionStatusMapper[info.statusCode] || | ||
"amqp:internal-error"; | ||
const e: AmqpError = { | ||
condition: condition, | ||
description: info.statusDescription | ||
}; | ||
const error = translate(e); | ||
log.error(error); | ||
return reject(error); | ||
log.error( | ||
"[%s] request-messageId | '%s' != '%s' | response-correlationId. " + | ||
"Hence dropping this response and waiting for the next one.", | ||
this.connection.id, | ||
request.message_id, | ||
responseCorrelationId | ||
); | ||
} | ||
}; | ||
const actionAfterTimeout = () => { | ||
timeOver = true; | ||
this.receiver.removeListener(ReceiverEvents.message, messageCallback); | ||
if (aborter) { | ||
aborter.removeEventListener("abort", onAbort); | ||
} | ||
const address = this.receiver.address || "address"; | ||
const desc: string = | ||
`The request with message_id "${request.message_id}" to "${address}" ` + | ||
`endpoint timed out. Please try again later.`; | ||
} else { | ||
const condition = | ||
info.errorCondition || ConditionStatusMapper[info.statusCode] || "amqp:internal-error"; | ||
const e: AmqpError = { | ||
condition: ConditionStatusMapper[408], | ||
description: desc | ||
condition: condition, | ||
description: info.statusDescription | ||
}; | ||
return reject(translate(e)); | ||
const error = translate(e); | ||
log.error(error); | ||
return reject(error); | ||
} | ||
}; | ||
const actionAfterTimeout = () => { | ||
timeOver = true; | ||
this.receiver.removeListener(ReceiverEvents.message, messageCallback); | ||
if (aborter) { | ||
aborter.removeEventListener("abort", onAbort); | ||
} | ||
const address = this.receiver.address || "address"; | ||
const desc: string = | ||
`The request with message_id "${request.message_id}" to "${address}" ` + | ||
`endpoint timed out. Please try again later.`; | ||
const e: Error = { | ||
name: "OperationTimeoutError", | ||
message: desc | ||
}; | ||
return reject(translate(e)); | ||
}; | ||
this.receiver.on(ReceiverEvents.message, messageCallback); | ||
waitTimer = setTimeout(actionAfterTimeout, options!.timeoutInSeconds! * 1000); | ||
log.reqres( | ||
"[%s] %s request sent: %O", | ||
this.connection.id, | ||
request.to || "$managment", | ||
request | ||
); | ||
this.sender.send(request); | ||
}); | ||
const config: RetryConfig<AmqpMessage> = { | ||
operation: sendRequestPromise, | ||
connectionId: this.connection.id, | ||
operationType: | ||
request.to && request.to === Constants.cbsEndpoint | ||
? RetryOperationType.cbsAuth | ||
: RetryOperationType.management, | ||
delayInSeconds: options.delayInSeconds, | ||
times: options.times | ||
}; | ||
return retry<AmqpMessage>(config); | ||
waitTimer = setTimeout(actionAfterTimeout, options.timeoutInMs); | ||
this.receiver.on(ReceiverEvents.message, messageCallback); | ||
log.reqres( | ||
"[%s] %s request sent: %O", | ||
this.connection.id, | ||
request.to || "$managment", | ||
request | ||
); | ||
this.sender.send(request); | ||
}); | ||
} | ||
@@ -269,0 +230,0 @@ |
137
src/retry.ts
@@ -7,4 +7,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import * as log from "./log"; | ||
import { defaultRetryAttempts, defaultDelayBetweenRetriesInSeconds } from "./util/constants"; | ||
import { | ||
defaultMaxRetries, | ||
defaultDelayBetweenOperationRetriesInMs, | ||
defaultMaxDelayForExponentialRetryInMs | ||
} from "./util/constants"; | ||
import { resolve } from "dns"; | ||
import { AbortSignalLike } from "@azure/abort-controller"; | ||
@@ -30,2 +35,11 @@ /** | ||
/** | ||
* Describes the Retry Mode type | ||
* @enum RetryMode | ||
*/ | ||
export enum RetryMode { | ||
Exponential, | ||
Fixed | ||
} | ||
/** | ||
* Describes the retry operation type. | ||
@@ -46,2 +60,34 @@ * @enum RetryOperationType | ||
/** | ||
* Retry policy options that determine the mode, number of retries, retry interval etc. | ||
*/ | ||
export interface RetryOptions { | ||
/** | ||
* @property {number} [maxRetries] Number of times the operation needs to be retried in case | ||
* of retryable error. Default: 3. | ||
*/ | ||
maxRetries?: number; | ||
/** | ||
* @property {number} [retryDelayInMs] Amount of time to wait in milliseconds before making the | ||
* next attempt. Default: `30000 milliseconds`. | ||
* When `mode` option is set to `Exponential`, | ||
* this is used to compute the exponentially increasing delays between retries. | ||
*/ | ||
retryDelayInMs?: number; | ||
/** | ||
* Number of milliseconds to wait before declaring that current attempt has timed out which will trigger a retry | ||
* A minimum value of `60000` milliseconds will be used if a value not greater than this is provided. | ||
*/ | ||
timeoutInMs?: number; | ||
/** | ||
* @property {RetryMode} [mode] Denotes which retry mode to apply. If undefined, defaults to `Fixed` | ||
*/ | ||
mode?: RetryMode; | ||
/** | ||
* @property {number} [maxRetryDelayInMs] Denotes the maximum delay between retries | ||
* that the retry attempts will be capped at. Applicable only when performing exponential retry. | ||
*/ | ||
maxRetryDelayInMs?: number; | ||
} | ||
/** | ||
* Describes the parameters that need to be configured for the retry operation. | ||
@@ -66,16 +112,15 @@ * @interface RetryConfig | ||
/** | ||
* @property {number} [times] Number of times the operation needs to be retried in case | ||
* of error. Default: 3. | ||
* @property {string} connectionHost The host "<yournamespace>.servicebus.windows.net". | ||
* Used to check network connectivity. | ||
*/ | ||
times?: number; | ||
connectionHost?: string; | ||
/** | ||
* @property {number} [delayInSeconds] Amount of time to wait in seconds before making the | ||
* next attempt. Default: 15. | ||
* @property {RetryOptions} retryOptions The retry related options associated with given operation execution. | ||
*/ | ||
delayInSeconds?: number; | ||
retryOptions?: RetryOptions; | ||
/** | ||
* @property {string} connectionHost The host "<yournamespace>.servicebus.windows.net". | ||
* Used to check network connectivity. | ||
* @property {AbortSignalLike} [abortSignal] The `AbortSignal` associated with the operation being retried on. | ||
* If this signal is fired during the wait time between retries, then the `retry()` method will ensure that the wait is abandoned and the retry process gets cancelled. If this signal is fired when the operation is in progress, then the operation is expected to react to it. | ||
*/ | ||
connectionHost?: string; | ||
abortSignal?: AbortSignalLike; | ||
} | ||
@@ -118,7 +163,14 @@ | ||
/** | ||
* It will attempt to linearly retry an operation specified number of times with a specified | ||
* delay in between each retry. The retries will only happen if the error is retryable. | ||
* Every operation is attempted at least once. Additional attempts are made if the previous attempt failed | ||
* with a retryable error. The number of additional attempts is governed by the `maxRetries` property provided | ||
* on the `RetryConfig` argument. | ||
* | ||
* @param {RetryConfig<T>} config Parameters to configure retry operation. | ||
* If `mode` option is set to `Fixed`, then the retries are made on the | ||
* given operation for a specified number of times, with a fixed delay in between each retry each time. | ||
* | ||
* If `mode` option is set to `Exponential`, then the delay between retries is adjusted to increase | ||
* exponentially with each attempt using back-off factor of power 2. | ||
* | ||
* @param {RetryConfig<T>} config Parameters to configure retry operation | ||
* | ||
* @return {Promise<T>} Promise<T>. | ||
@@ -128,17 +180,26 @@ */ | ||
validateRetryConfig(config); | ||
if (config.times == undefined) config.times = defaultRetryAttempts; | ||
if (config.delayInSeconds == undefined) { | ||
config.delayInSeconds = defaultDelayBetweenRetriesInSeconds; | ||
if (!config.retryOptions) { | ||
config.retryOptions = {}; | ||
} | ||
if (config.retryOptions.maxRetries == undefined || config.retryOptions.maxRetries < 0) { | ||
config.retryOptions.maxRetries = defaultMaxRetries; | ||
} | ||
if (config.retryOptions.retryDelayInMs == undefined || config.retryOptions.retryDelayInMs < 0) { | ||
config.retryOptions.retryDelayInMs = defaultDelayBetweenOperationRetriesInMs; | ||
} | ||
if ( | ||
config.retryOptions.maxRetryDelayInMs == undefined || | ||
config.retryOptions.maxRetryDelayInMs < 0 | ||
) { | ||
config.retryOptions.maxRetryDelayInMs = defaultMaxDelayForExponentialRetryInMs; | ||
} | ||
if (config.retryOptions.mode == undefined) { | ||
config.retryOptions.mode = RetryMode.Fixed; | ||
} | ||
let lastError: MessagingError | undefined; | ||
let result: any; | ||
let success = false; | ||
for (let i = 0; i < config.times; i++) { | ||
const j = i + 1; | ||
log.retry( | ||
"[%s] Retry for '%s', attempt number: %d", | ||
config.connectionId, | ||
config.operationType, | ||
j | ||
); | ||
const totalNumberOfAttempts = config.retryOptions.maxRetries + 1; | ||
for (let i = 1; i <= totalNumberOfAttempts; i++) { | ||
log.retry("[%s] Attempt number: %d", config.connectionId, config.operationType, i); | ||
try { | ||
@@ -151,3 +212,3 @@ result = await config.operation(); | ||
config.operationType, | ||
j | ||
i | ||
); | ||
@@ -180,13 +241,31 @@ if (result && !isDelivery(result)) { | ||
config.operationType, | ||
j, | ||
i, | ||
err | ||
); | ||
let targetDelayInMs = config.retryOptions.retryDelayInMs; | ||
if (config.retryOptions.mode === RetryMode.Exponential) { | ||
let incrementDelta = Math.pow(2, i) - 1; | ||
const boundedRandDelta = | ||
config.retryOptions.retryDelayInMs * 0.8 + | ||
Math.floor( | ||
Math.random() * | ||
(config.retryOptions.retryDelayInMs * 1.2 - config.retryOptions.retryDelayInMs * 0.8) | ||
); | ||
incrementDelta *= boundedRandDelta; | ||
targetDelayInMs = Math.min(incrementDelta, config.retryOptions.maxRetryDelayInMs); | ||
} | ||
if (lastError && lastError.retryable) { | ||
log.error( | ||
"[%s] Sleeping for %d seconds for '%s'.", | ||
"[%s] Sleeping for %d milliseconds for '%s'.", | ||
config.connectionId, | ||
config.delayInSeconds, | ||
targetDelayInMs, | ||
config.operationType | ||
); | ||
await delay(config.delayInSeconds * 1000); | ||
await delay( | ||
targetDelayInMs, | ||
config.abortSignal, | ||
`The retry operation has been cancelled by the user.` | ||
); | ||
continue; | ||
@@ -193,0 +272,0 @@ } else { |
@@ -53,3 +53,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export const connectionError = "connection_error"; | ||
export const defaultOperationTimeoutInSeconds = 60; | ||
export const defaultOperationTimeoutInMs = 60000; | ||
export const managementRequestKey = "managementRequest"; | ||
@@ -75,8 +75,8 @@ export const negotiateCbsKey = "negotiateCbs"; | ||
export const maxAbsoluteExpiryTime = new Date("9999-12-31T07:59:59.000Z").getTime(); | ||
export const aadTokenValidityMarginSeconds = 5; | ||
export const aadTokenValidityMarginInMs = 5000; | ||
export const connectionReconnectDelay = 300; | ||
export const defaultRetryAttempts = 3; | ||
export const defaultConnectionRetryAttempts = 150; | ||
export const defaultDelayBetweenOperationRetriesInSeconds = 5; | ||
export const defaultDelayBetweenRetriesInSeconds = 15; | ||
export const defaultMaxRetries = 3; | ||
export const defaultMaxRetriesForConnection = 150; | ||
export const defaultDelayBetweenOperationRetriesInMs = 30000; | ||
export const defaultMaxDelayForExponentialRetryInMs = 90000; | ||
export const receiverSettleMode = "receiver-settle-mode"; | ||
@@ -83,0 +83,0 @@ export const dispositionStatus = "disposition-status"; |
@@ -5,2 +5,4 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import AsyncLock from "async-lock"; | ||
import { AbortSignalLike, AbortError } from "@azure/abort-controller"; | ||
export { AsyncLock }; | ||
@@ -202,8 +204,46 @@ /** | ||
* A wrapper for setTimeout that resolves a promise after t milliseconds. | ||
* @param {number} t - The number of milliseconds to be delayed. | ||
* @param {number} delayInMs - The number of milliseconds to be delayed. | ||
* @param {AbortSignalLike} abortSignal - The abortSignal associated with containing operation. | ||
* @param {string} abortErrorMsg - The abort error message associated with containing operation. | ||
* @param {T} value - The value to be resolved with after a timeout of t milliseconds. | ||
* @returns {Promise<T>} - Resolved promise | ||
*/ | ||
export function delay<T>(t: number, value?: T): Promise<T> { | ||
return new Promise((resolve) => setTimeout(() => resolve(value), t)); | ||
export function delay<T>( | ||
delayInMs: number, | ||
abortSignal?: AbortSignalLike, | ||
abortErrorMsg?: string, | ||
value?: T | ||
): Promise<T> { | ||
return new Promise((resolve, reject) => { | ||
const rejectOnAbort = () => { | ||
return reject( | ||
new AbortError(abortErrorMsg ? abortErrorMsg : `The delay was cancelled by the user.`) | ||
); | ||
}; | ||
const removeListeners = () => { | ||
if (abortSignal) { | ||
abortSignal.removeEventListener("abort", onAborted); | ||
} | ||
}; | ||
const onAborted = () => { | ||
clearTimeout(timer); | ||
removeListeners(); | ||
return rejectOnAbort(); | ||
}; | ||
if (abortSignal && abortSignal.aborted) { | ||
return rejectOnAbort(); | ||
} | ||
const timer = setTimeout(() => { | ||
removeListeners(); | ||
resolve(value); | ||
}, delayInMs); | ||
if (abortSignal) { | ||
abortSignal.addEventListener("abort", onAborted); | ||
} | ||
}); | ||
} | ||
@@ -210,0 +250,0 @@ |
import { SharedKeyCredential } from "./sas"; | ||
import { AccessToken } from "@azure/core-http"; | ||
import { AccessToken } from "@azure/core-auth"; | ||
/** | ||
@@ -4,0 +4,0 @@ * @class IotSharedKeyCredential |
/// <reference types="node" /> | ||
import { AccessToken } from "@azure/core-http"; | ||
import { AccessToken } from "@azure/core-auth"; | ||
/** | ||
@@ -4,0 +4,0 @@ * @class SharedKeyCredential |
import { TokenType } from "./auth/token"; | ||
import { AccessToken } from "@azure/core-http"; | ||
import { AccessToken } from "@azure/core-auth"; | ||
import { Connection } from "rhea-promise"; | ||
@@ -4,0 +4,0 @@ /** |
import { Connection } from "rhea-promise"; | ||
import { CbsClient } from "./cbs"; | ||
import { DataTransformer } from "./dataTransformer"; | ||
import { TokenCredential } from "@azure/core-http"; | ||
import { TokenCredential } from "@azure/core-auth"; | ||
import { ConnectionConfig } from "./connectionConfig/connectionConfig"; | ||
@@ -110,7 +110,7 @@ import { SharedKeyCredential } from "./auth/sas"; | ||
/** | ||
* @property {number} [operationTimeoutInSeconds] - The duration in which the promise should | ||
* @property {number} [operationTimeoutInMs] - The duration in which the promise should | ||
* complete (resolve/reject). If it is not completed, then the Promise will be rejected after | ||
* timeout occurs. Default: `60 seconds`. | ||
* timeout occurs. Default: `60000 milliseconds`. | ||
*/ | ||
operationTimeoutInSeconds?: number; | ||
operationTimeoutInMs?: number; | ||
} | ||
@@ -117,0 +117,0 @@ export declare module ConnectionContextBase { |
@@ -123,6 +123,2 @@ import { AmqpError } from "rhea-promise"; | ||
/** | ||
* Error is thrown when timeout happens for the said operation. | ||
*/ | ||
"amqp:operation-timeout" = "OperationTimeoutError", | ||
/** | ||
* Error is thrown when an argument has a value that is out of the admissible range. | ||
@@ -329,6 +325,2 @@ */ | ||
/** | ||
* Error is thrown when timeout happens for the said operation. | ||
*/ | ||
OperationTimeoutError = "amqp:operation-timeout", | ||
/** | ||
* Error is thrown when an argument has a value that is out of the admissible range. | ||
@@ -335,0 +327,0 @@ */ |
/// <reference lib="es2015" /> | ||
export { RequestResponseLink, SendRequestOptions } from "./requestResponseLink"; | ||
export { retry, RetryConfig, RetryOperationType } from "./retry"; | ||
export { retry, RetryOptions, RetryConfig, RetryOperationType, RetryMode } from "./retry"; | ||
export { DataTransformer, DefaultDataTransformer } from "./dataTransformer"; | ||
export { TokenType } from "./auth/token"; | ||
export { AccessToken, TokenCredential, isTokenCredential } from "@azure/core-http"; | ||
export { AccessToken, TokenCredential, isTokenCredential } from "@azure/core-auth"; | ||
export { SharedKeyCredential } from "./auth/sas"; | ||
@@ -8,0 +8,0 @@ export { IotSharedKeyCredential } from "./auth/iotSas"; |
@@ -13,17 +13,7 @@ import { AbortSignalLike } from "@azure/abort-controller"; | ||
/** | ||
* @property {number} [timeoutInSeconds] Max time to wait for the operation to complete. | ||
* Default: `10 seconds`. | ||
* @property {number} [timeoutInMs] Max time to wait for the operation to complete. | ||
* Default: `60000 milliseconds`. | ||
*/ | ||
timeoutInSeconds?: number; | ||
timeoutInMs?: number; | ||
/** | ||
* @property {number} [times] Number of times the operation needs to be retried in case | ||
* of error. Default: 3. | ||
*/ | ||
times?: number; | ||
/** | ||
* @property {number} [delayInSeconds] Amount of time to wait in seconds before making the | ||
* next attempt. Default: 15. | ||
*/ | ||
delayInSeconds?: number; | ||
/** | ||
* @property {string} [requestName] Name of the request being performed. | ||
@@ -60,5 +50,3 @@ */ | ||
* Sends the given request message and returns the received response. If the operation is not | ||
* completed in the provided timeout in seconds `default: 10`, then the request will be retried | ||
* linearly for the provided number of times `default: 3` with the provided delay in seconds | ||
* `default: 15` between each attempt. | ||
* completed in the provided timeout in milliseconds `default: 60000`, then `OperationTimeoutError` is thrown. | ||
* | ||
@@ -65,0 +53,0 @@ * @param {Message} request The AMQP (request) message. |
@@ -0,2 +1,11 @@ | ||
import { AbortSignalLike } from "@azure/abort-controller"; | ||
/** | ||
* Describes the Retry Mode type | ||
* @enum RetryMode | ||
*/ | ||
export declare enum RetryMode { | ||
Exponential = 0, | ||
Fixed = 1 | ||
} | ||
/** | ||
* Describes the retry operation type. | ||
@@ -16,2 +25,33 @@ * @enum RetryOperationType | ||
/** | ||
* Retry policy options that determine the mode, number of retries, retry interval etc. | ||
*/ | ||
export interface RetryOptions { | ||
/** | ||
* @property {number} [maxRetries] Number of times the operation needs to be retried in case | ||
* of retryable error. Default: 3. | ||
*/ | ||
maxRetries?: number; | ||
/** | ||
* @property {number} [retryDelayInMs] Amount of time to wait in milliseconds before making the | ||
* next attempt. Default: `30000 milliseconds`. | ||
* When `mode` option is set to `Exponential`, | ||
* this is used to compute the exponentially increasing delays between retries. | ||
*/ | ||
retryDelayInMs?: number; | ||
/** | ||
* Number of milliseconds to wait before declaring that current attempt has timed out which will trigger a retry | ||
* A minimum value of `60000` milliseconds will be used if a value not greater than this is provided. | ||
*/ | ||
timeoutInMs?: number; | ||
/** | ||
* @property {RetryMode} [mode] Denotes which retry mode to apply. If undefined, defaults to `Fixed` | ||
*/ | ||
mode?: RetryMode; | ||
/** | ||
* @property {number} [maxRetryDelayInMs] Denotes the maximum delay between retries | ||
* that the retry attempts will be capped at. Applicable only when performing exponential retry. | ||
*/ | ||
maxRetryDelayInMs?: number; | ||
} | ||
/** | ||
* Describes the parameters that need to be configured for the retry operation. | ||
@@ -36,23 +76,29 @@ * @interface RetryConfig | ||
/** | ||
* @property {number} [times] Number of times the operation needs to be retried in case | ||
* of error. Default: 3. | ||
* @property {string} connectionHost The host "<yournamespace>.servicebus.windows.net". | ||
* Used to check network connectivity. | ||
*/ | ||
times?: number; | ||
connectionHost?: string; | ||
/** | ||
* @property {number} [delayInSeconds] Amount of time to wait in seconds before making the | ||
* next attempt. Default: 15. | ||
* @property {RetryOptions} retryOptions The retry related options associated with given operation execution. | ||
*/ | ||
delayInSeconds?: number; | ||
retryOptions?: RetryOptions; | ||
/** | ||
* @property {string} connectionHost The host "<yournamespace>.servicebus.windows.net". | ||
* Used to check network connectivity. | ||
* @property {AbortSignalLike} [abortSignal] The `AbortSignal` associated with the operation being retried on. | ||
* If this signal is fired during the wait time between retries, then the `retry()` method will ensure that the wait is abandoned and the retry process gets cancelled. If this signal is fired when the operation is in progress, then the operation is expected to react to it. | ||
*/ | ||
connectionHost?: string; | ||
abortSignal?: AbortSignalLike; | ||
} | ||
/** | ||
* It will attempt to linearly retry an operation specified number of times with a specified | ||
* delay in between each retry. The retries will only happen if the error is retryable. | ||
* Every operation is attempted at least once. Additional attempts are made if the previous attempt failed | ||
* with a retryable error. The number of additional attempts is governed by the `maxRetries` property provided | ||
* on the `RetryConfig` argument. | ||
* | ||
* @param {RetryConfig<T>} config Parameters to configure retry operation. | ||
* If `mode` option is set to `Fixed`, then the retries are made on the | ||
* given operation for a specified number of times, with a fixed delay in between each retry each time. | ||
* | ||
* If `mode` option is set to `Exponential`, then the delay between retries is adjusted to increase | ||
* exponentially with each attempt using back-off factor of power 2. | ||
* | ||
* @param {RetryConfig<T>} config Parameters to configure retry operation | ||
* | ||
* @return {Promise<T>} Promise<T>. | ||
@@ -59,0 +105,0 @@ */ |
@@ -50,3 +50,3 @@ export declare const associatedLinkName = "associated-link-name"; | ||
export declare const connectionError = "connection_error"; | ||
export declare const defaultOperationTimeoutInSeconds = 60; | ||
export declare const defaultOperationTimeoutInMs = 60000; | ||
export declare const managementRequestKey = "managementRequest"; | ||
@@ -69,8 +69,8 @@ export declare const negotiateCbsKey = "negotiateCbs"; | ||
export declare const maxAbsoluteExpiryTime: number; | ||
export declare const aadTokenValidityMarginSeconds = 5; | ||
export declare const aadTokenValidityMarginInMs = 5000; | ||
export declare const connectionReconnectDelay = 300; | ||
export declare const defaultRetryAttempts = 3; | ||
export declare const defaultConnectionRetryAttempts = 150; | ||
export declare const defaultDelayBetweenOperationRetriesInSeconds = 5; | ||
export declare const defaultDelayBetweenRetriesInSeconds = 15; | ||
export declare const defaultMaxRetries = 3; | ||
export declare const defaultMaxRetriesForConnection = 150; | ||
export declare const defaultDelayBetweenOperationRetriesInMs = 30000; | ||
export declare const defaultMaxDelayForExponentialRetryInMs = 90000; | ||
export declare const receiverSettleMode = "receiver-settle-mode"; | ||
@@ -77,0 +77,0 @@ export declare const dispositionStatus = "disposition-status"; |
import AsyncLock from "async-lock"; | ||
import { AbortSignalLike } from "@azure/abort-controller"; | ||
export { AsyncLock }; | ||
@@ -121,7 +122,9 @@ /** | ||
* A wrapper for setTimeout that resolves a promise after t milliseconds. | ||
* @param {number} t - The number of milliseconds to be delayed. | ||
* @param {number} delayInMs - The number of milliseconds to be delayed. | ||
* @param {AbortSignalLike} abortSignal - The abortSignal associated with containing operation. | ||
* @param {string} abortErrorMsg - The abort error message associated with containing operation. | ||
* @param {T} value - The value to be resolved with after a timeout of t milliseconds. | ||
* @returns {Promise<T>} - Resolved promise | ||
*/ | ||
export declare function delay<T>(t: number, value?: T): Promise<T>; | ||
export declare function delay<T>(delayInMs: number, abortSignal?: AbortSignalLike, abortErrorMsg?: string, value?: T): Promise<T>; | ||
/** | ||
@@ -128,0 +131,0 @@ * Generates a random number between the given interval |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
49
23427
238
2301024
+ Added@azure/core-auth@1.0.0-preview.2(transitive)
+ Addedavailable-typed-arrays@1.0.7(transitive)
+ Addedcall-bind@1.0.8(transitive)
+ Addeddefine-data-property@1.1.4(transitive)
+ Addedfor-each@0.3.3(transitive)
+ Addedhas-property-descriptors@1.0.2(transitive)
+ Addedhas-tostringtag@1.0.2(transitive)
+ Addedis-arguments@1.2.0(transitive)
+ Addedis-callable@1.2.7(transitive)
+ Addedis-generator-function@1.1.0(transitive)
+ Addedis-regex@1.2.1(transitive)
+ Addedis-typed-array@1.1.15(transitive)
+ Addedpossible-typed-array-names@1.0.0(transitive)
+ Addedrhea-promise@1.2.1(transitive)
+ Addedsafe-regex-test@1.1.0(transitive)
+ Addedset-function-length@1.2.2(transitive)
+ Addedutil@0.12.5(transitive)
+ Addedwhich-typed-array@1.1.18(transitive)
- Removed@azure/core-http@^1.0.0-preview.1
- Removed@azure/abort-controller@1.1.02.1.2(transitive)
- Removed@azure/core-asynciterator-polyfill@1.0.2(transitive)
- Removed@azure/core-auth@1.9.0(transitive)
- Removed@azure/core-http@1.2.6(transitive)
- Removed@azure/core-tracing@1.0.0-preview.11(transitive)
- Removed@azure/core-util@1.11.0(transitive)
- Removed@azure/logger@1.1.4(transitive)
- Removed@opencensus/web-types@0.0.7(transitive)
- Removed@opentelemetry/api@1.0.0-rc.0(transitive)
- Removed@types/node-fetch@2.6.12(transitive)
- Removed@types/tunnel@0.0.1(transitive)
- Removedasynckit@0.4.0(transitive)
- Removedcombined-stream@1.0.8(transitive)
- Removeddelayed-stream@1.0.0(transitive)
- Removedform-data@3.0.24.0.1(transitive)
- Removedinherits@2.0.3(transitive)
- Removedmime-db@1.52.0(transitive)
- Removedmime-types@2.1.35(transitive)
- Removednode-fetch@2.7.0(transitive)
- Removedpsl@1.15.0(transitive)
- Removedpunycode@2.3.1(transitive)
- Removedquerystringify@2.2.0(transitive)
- Removedrequires-port@1.0.0(transitive)
- Removedrhea-promise@0.1.15(transitive)
- Removedsax@1.4.1(transitive)
- Removedtough-cookie@4.1.4(transitive)
- Removedtr46@0.0.3(transitive)
- Removedtslib@2.8.1(transitive)
- Removedtunnel@0.0.6(transitive)
- Removeduniversalify@0.2.0(transitive)
- Removedurl-parse@1.5.10(transitive)
- Removedutil@0.11.1(transitive)
- Removeduuid@8.3.2(transitive)
- Removedwebidl-conversions@3.0.1(transitive)
- Removedwhatwg-url@5.0.0(transitive)
- Removedxml2js@0.4.23(transitive)
- Removedxmlbuilder@11.0.1(transitive)
Updatedutil@^0.12.1