@fluidframework/driver-utils
Advanced tools
Comparing version 0.39.3 to 0.40.0-25719
{ | ||
"$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", | ||
"extends": "@fluidframework/build-common/api-extractor-common.json" | ||
"extends": "@fluidframework/build-common/api-extractor-common-report.json" | ||
} |
@@ -17,2 +17,3 @@ /*! | ||
export * from "./networkUtils"; | ||
export * from "./runWithRetry"; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -29,2 +29,3 @@ "use strict"; | ||
__exportStar(require("./networkUtils"), exports); | ||
__exportStar(require("./runWithRetry"), exports); | ||
//# sourceMappingURL=index.js.map |
@@ -19,4 +19,10 @@ /*! | ||
readonly errorType = DriverErrorType.genericNetworkError; | ||
constructor(errorMessage: string, canRetry: boolean, statusCode: number | undefined); | ||
constructor(errorMessage: string, canRetry: boolean, props?: ITaggableTelemetryProperties); | ||
} | ||
export declare class DeltaStreamConnectionForbiddenError extends LoggingError { | ||
static readonly errorType: string; | ||
readonly errorType: string; | ||
readonly canRetry = false; | ||
constructor(errorMessage: string); | ||
} | ||
export declare class AuthorizationError extends LoggingError implements IAuthorizationError { | ||
@@ -27,3 +33,3 @@ readonly claims: string | undefined; | ||
readonly canRetry = false; | ||
constructor(errorMessage: string, claims: string | undefined, tenantId: string | undefined, statusCode: number); | ||
constructor(errorMessage: string, claims: string | undefined, tenantId: string | undefined, props?: ITaggableTelemetryProperties); | ||
} | ||
@@ -50,6 +56,6 @@ export declare class NetworkErrorBasic<T> extends LoggingError { | ||
readonly canRetry = true; | ||
constructor(errorMessage: string, retryAfterSeconds: number, statusCode?: number); | ||
constructor(errorMessage: string, retryAfterSeconds: number, props?: ITaggableTelemetryProperties); | ||
} | ||
export declare const createWriteError: (errorMessage: string) => NonRetryableError<DriverErrorType>; | ||
export declare function createGenericNetworkError(errorMessage: string, canRetry: boolean, retryAfterSeconds?: number, statusCode?: number): GenericNetworkError | ThrottlingError; | ||
export declare function createGenericNetworkError(errorMessage: string, canRetry: boolean, retryAfterSeconds?: number, props?: ITaggableTelemetryProperties): GenericNetworkError | ThrottlingError; | ||
/** | ||
@@ -56,0 +62,0 @@ * Check if a connection error can be retried. Unless explicitly allowed, retry is disallowed. |
@@ -6,4 +6,5 @@ "use strict"; | ||
*/ | ||
var _a; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.getRetryDelayFromError = exports.canRetryOnError = exports.createGenericNetworkError = exports.createWriteError = exports.ThrottlingError = exports.RetryableError = exports.NonRetryableError = exports.NetworkErrorBasic = exports.AuthorizationError = exports.GenericNetworkError = exports.isOnline = exports.OnlineStatus = void 0; | ||
exports.getRetryDelayFromError = exports.canRetryOnError = exports.createGenericNetworkError = exports.createWriteError = exports.ThrottlingError = exports.RetryableError = exports.NonRetryableError = exports.NetworkErrorBasic = exports.AuthorizationError = exports.DeltaStreamConnectionForbiddenError = exports.GenericNetworkError = exports.isOnline = exports.OnlineStatus = void 0; | ||
const driver_definitions_1 = require("@fluidframework/driver-definitions"); | ||
@@ -32,4 +33,4 @@ const telemetry_utils_1 = require("@fluidframework/telemetry-utils"); | ||
class GenericNetworkError extends telemetry_utils_1.LoggingError { | ||
constructor(errorMessage, canRetry, statusCode) { | ||
super(errorMessage, { statusCode }); | ||
constructor(errorMessage, canRetry, props) { | ||
super(errorMessage, props); | ||
this.canRetry = canRetry; | ||
@@ -40,5 +41,20 @@ this.errorType = driver_definitions_1.DriverErrorType.genericNetworkError; | ||
exports.GenericNetworkError = GenericNetworkError; | ||
// Todo GH #6214: Remove after next drive def bump. This is necessary as there is no | ||
// compatible way to augment an enum, as it can't be optional. So for now | ||
// we need to duplicate the value here. We likely need to rethink our | ||
// DriverErrorType strategy so that it supports extension with optional | ||
// value. | ||
const deltaStreamConnectionForbiddenStr = "deltaStreamConnectionForbidden"; | ||
class DeltaStreamConnectionForbiddenError extends telemetry_utils_1.LoggingError { | ||
constructor(errorMessage) { | ||
super(errorMessage, { statusCode: 400 }); | ||
this.errorType = DeltaStreamConnectionForbiddenError.errorType; | ||
this.canRetry = false; | ||
} | ||
} | ||
exports.DeltaStreamConnectionForbiddenError = DeltaStreamConnectionForbiddenError; | ||
DeltaStreamConnectionForbiddenError.errorType = (_a = driver_definitions_1.DriverErrorType[deltaStreamConnectionForbiddenStr]) !== null && _a !== void 0 ? _a : deltaStreamConnectionForbiddenStr; | ||
class AuthorizationError extends telemetry_utils_1.LoggingError { | ||
constructor(errorMessage, claims, tenantId, statusCode) { | ||
super(errorMessage, { statusCode }); | ||
constructor(errorMessage, claims, tenantId, props) { | ||
super(errorMessage, props); | ||
this.claims = claims; | ||
@@ -77,4 +93,4 @@ this.tenantId = tenantId; | ||
class ThrottlingError extends telemetry_utils_1.LoggingError { | ||
constructor(errorMessage, retryAfterSeconds, statusCode) { | ||
super(errorMessage, { statusCode }); | ||
constructor(errorMessage, retryAfterSeconds, props) { | ||
super(errorMessage, props); | ||
this.retryAfterSeconds = retryAfterSeconds; | ||
@@ -88,7 +104,7 @@ this.errorType = driver_definitions_1.DriverErrorType.throttlingError; | ||
exports.createWriteError = createWriteError; | ||
function createGenericNetworkError(errorMessage, canRetry, retryAfterSeconds, statusCode) { | ||
function createGenericNetworkError(errorMessage, canRetry, retryAfterSeconds, props) { | ||
if (retryAfterSeconds !== undefined && canRetry) { | ||
return new ThrottlingError(errorMessage, retryAfterSeconds, statusCode); | ||
return new ThrottlingError(errorMessage, retryAfterSeconds, props); | ||
} | ||
return new GenericNetworkError(errorMessage, canRetry, statusCode); | ||
return new GenericNetworkError(errorMessage, canRetry, props); | ||
} | ||
@@ -103,5 +119,5 @@ exports.createGenericNetworkError = createGenericNetworkError; | ||
exports.canRetryOnError = canRetryOnError; | ||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return | ||
const getRetryDelayFromError = (error) => error === null || error === void 0 ? void 0 : error.retryAfterSeconds; | ||
const getRetryDelayFromError = (error) => (error === null || error === void 0 ? void 0 : error.retryAfterSeconds) !== undefined ? | ||
error.retryAfterSeconds * 1000 : undefined; | ||
exports.getRetryDelayFromError = getRetryDelayFromError; | ||
//# sourceMappingURL=network.js.map |
@@ -8,3 +8,3 @@ /*! | ||
export declare const pkgName = "@fluidframework/driver-utils"; | ||
export declare const pkgVersion = "0.39.3"; | ||
export declare const pkgVersion = "0.40.0-25719"; | ||
//# sourceMappingURL=packageVersion.d.ts.map |
@@ -11,3 +11,3 @@ "use strict"; | ||
exports.pkgName = "@fluidframework/driver-utils"; | ||
exports.pkgVersion = "0.39.3"; | ||
exports.pkgVersion = "0.40.0-25719"; | ||
//# sourceMappingURL=packageVersion.js.map |
@@ -1,2 +0,2 @@ | ||
import { ITelemetryLogger } from "@fluidframework/common-definitions"; | ||
import { ITelemetryLogger, ITelemetryProperties } from "@fluidframework/common-definitions"; | ||
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; | ||
@@ -30,3 +30,3 @@ import { IDeltasFetchResult, IStream, IStreamResult } from "@fluidframework/driver-definitions"; | ||
private readonly knewTo; | ||
constructor(from: number, to: number | undefined, payloadSize: number, logger: ITelemetryLogger, requestCallback: (request: number, from: number, to: number, strongTo: boolean) => Promise<{ | ||
constructor(from: number, to: number | undefined, payloadSize: number, logger: ITelemetryLogger, requestCallback: (request: number, from: number, to: number, strongTo: boolean, props: ITelemetryProperties) => Promise<{ | ||
partial: boolean; | ||
@@ -59,3 +59,3 @@ cancel: boolean; | ||
} | ||
export declare function requestOps(get: (from: number, to: number) => Promise<IDeltasFetchResult>, concurrency: number, from: number, to: number | undefined, payloadSize: number, logger: ITelemetryLogger, signal?: AbortSignal): IStream<ISequencedDocumentMessage[]>; | ||
export declare function requestOps(get: (from: number, to: number, telemetryProps: ITelemetryProperties) => Promise<IDeltasFetchResult>, concurrency: number, fromTotal: number, toTotal: number | undefined, payloadSize: number, logger: ITelemetryLogger, signal?: AbortSignal): IStream<ISequencedDocumentMessage[]>; | ||
export declare const emptyMessageStream: IStream<ISequencedDocumentMessage[]>; | ||
@@ -62,0 +62,0 @@ export declare function streamFromMessages(messagesArg: Promise<ISequencedDocumentMessage[]>): IStream<ISequencedDocumentMessage[]>; |
@@ -12,4 +12,4 @@ "use strict"; | ||
const networkUtils_1 = require("./networkUtils"); | ||
const MaxFetchDelaySeconds = 10; | ||
const MissingFetchDelaySeconds = 0.1; | ||
const MaxFetchDelayInMs = 10000; | ||
const MissingFetchDelayInMs = 100; | ||
/** | ||
@@ -137,3 +137,3 @@ * Helper class to organize parallel fetching of data | ||
this.requests++; | ||
const promise = this.requestCallback(this.requests, from, to, this.to !== undefined); | ||
const promise = this.requestCallback(this.requests, from, to, this.to !== undefined, {}); | ||
// dispatch any prior received data | ||
@@ -304,12 +304,10 @@ this.dispatch(); | ||
*/ | ||
async function getSingleOpBatch(get, request, from, to, telemetryEvent, strongTo, signal) { | ||
async function getSingleOpBatch(get, props, strongTo, signal) { | ||
let lastSuccessTime; | ||
let retry = 0; | ||
const deltas = []; | ||
let deltasRetrievedTotal = 0; | ||
const nothing = { partial: false, cancel: true, payload: [] }; | ||
const start = common_utils_1.performance.now(); | ||
while ((signal === null || signal === void 0 ? void 0 : signal.aborted) !== true) { | ||
retry++; | ||
let delay = Math.min(MaxFetchDelaySeconds, MissingFetchDelaySeconds * Math.pow(2, retry)); | ||
let delay = Math.min(MaxFetchDelayInMs, MissingFetchDelayInMs * Math.pow(2, retry)); | ||
let canRetry = false; | ||
@@ -319,15 +317,7 @@ try { | ||
canRetry = true; | ||
const deltasP = get(from, to); | ||
const deltasP = get(Object.assign(Object.assign({}, props), { retry })); | ||
const { messages, partialResult } = await deltasP; | ||
deltas.push(...messages); | ||
const deltasRetrievedLast = messages.length; | ||
deltasRetrievedTotal += deltasRetrievedLast; | ||
if (deltasRetrievedLast !== 0 || !strongTo) { | ||
telemetryEvent.reportProgress({ | ||
chunkDeltas: deltasRetrievedTotal, | ||
chunkFrom: from, | ||
chunkTo: to, | ||
chunkRequests: retry, | ||
chunkDuration: telemetry_utils_1.TelemetryLogger.formatTick(common_utils_1.performance.now() - start), | ||
}); | ||
return { payload: deltas, cancel: false, partial: partialResult }; | ||
@@ -346,12 +336,3 @@ } | ||
// current as it can't get ops. | ||
telemetryEvent.cancel({ | ||
category: "error", | ||
error: "too many retries", | ||
retry, | ||
request, | ||
deltasRetrievedTotal, | ||
replayFrom: from, | ||
to, | ||
}); | ||
throw network_1.createGenericNetworkError("Failed to retrieve ops from storage: giving up after too many retries", false /* canRetry */); | ||
throw network_1.createGenericNetworkError("Failed to retrieve ops from storage: too many retries", false /* canRetry */, undefined /* retryAfterSeconds */, Object.assign({ retry }, props)); | ||
} | ||
@@ -367,5 +348,3 @@ } | ||
eventName: "GetDeltas_Error", | ||
fetchTo: to, | ||
from, | ||
request, | ||
...props, | ||
retry, | ||
@@ -377,3 +356,2 @@ }, | ||
// It's game over scenario. | ||
telemetryEvent.cancel({ category: "error" }, error); | ||
throw error; | ||
@@ -386,16 +364,7 @@ } | ||
} | ||
/* | ||
if (to !== undefined && this.lastQueuedSequenceNumber >= to) { | ||
// the client caught up while we were trying to fetch ops from storage | ||
// bail out since we no longer need to request these ops | ||
return nothing; | ||
} | ||
*/ | ||
await networkUtils_1.waitForConnectedState(delay * 1000); | ||
await networkUtils_1.waitForConnectedState(delay); | ||
} | ||
// Might need to change to non-error event | ||
telemetryEvent.cancel({ error: "container closed" }); | ||
return nothing; | ||
} | ||
function requestOps(get, concurrency, from, to, payloadSize, logger, signal) { | ||
function requestOps(get, concurrency, fromTotal, toTotal, payloadSize, logger, signal) { | ||
let requests = 0; | ||
@@ -405,10 +374,10 @@ let lastFetch; | ||
const queue = new Queue(); | ||
const telemetryEvent = telemetry_utils_1.PerformanceEvent.start(logger, { | ||
eventName: `GetDeltas`, | ||
from, | ||
to, | ||
}); | ||
const manager = new ParallelRequests(from, to, payloadSize, logger, async (request, _from, _to, strongTo) => { | ||
const propsTotal = { | ||
fromTotal, | ||
toTotal, | ||
}; | ||
const telemetryEvent = telemetry_utils_1.PerformanceEvent.start(logger, Object.assign({ eventName: `GetDeltas` }, propsTotal)); | ||
const manager = new ParallelRequests(fromTotal, toTotal, payloadSize, logger, async (request, from, to, strongTo, propsPerRequest) => { | ||
requests++; | ||
return getSingleOpBatch(get, request, _from, _to, telemetryEvent, strongTo, signal); | ||
return getSingleOpBatch(async (propsAll) => get(from, to, propsAll), Object.assign(Object.assign({ request, from, to }, propsTotal), propsPerRequest), strongTo, signal); | ||
}, (deltas) => { | ||
@@ -415,0 +384,0 @@ lastFetch = deltas[deltas.length - 1].sequenceNumber; |
@@ -17,2 +17,3 @@ /*! | ||
export * from "./networkUtils"; | ||
export * from "./runWithRetry"; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -17,2 +17,3 @@ /*! | ||
export * from "./networkUtils"; | ||
export * from "./runWithRetry"; | ||
//# sourceMappingURL=index.js.map |
@@ -19,4 +19,10 @@ /*! | ||
readonly errorType = DriverErrorType.genericNetworkError; | ||
constructor(errorMessage: string, canRetry: boolean, statusCode: number | undefined); | ||
constructor(errorMessage: string, canRetry: boolean, props?: ITaggableTelemetryProperties); | ||
} | ||
export declare class DeltaStreamConnectionForbiddenError extends LoggingError { | ||
static readonly errorType: string; | ||
readonly errorType: string; | ||
readonly canRetry = false; | ||
constructor(errorMessage: string); | ||
} | ||
export declare class AuthorizationError extends LoggingError implements IAuthorizationError { | ||
@@ -27,3 +33,3 @@ readonly claims: string | undefined; | ||
readonly canRetry = false; | ||
constructor(errorMessage: string, claims: string | undefined, tenantId: string | undefined, statusCode: number); | ||
constructor(errorMessage: string, claims: string | undefined, tenantId: string | undefined, props?: ITaggableTelemetryProperties); | ||
} | ||
@@ -50,6 +56,6 @@ export declare class NetworkErrorBasic<T> extends LoggingError { | ||
readonly canRetry = true; | ||
constructor(errorMessage: string, retryAfterSeconds: number, statusCode?: number); | ||
constructor(errorMessage: string, retryAfterSeconds: number, props?: ITaggableTelemetryProperties); | ||
} | ||
export declare const createWriteError: (errorMessage: string) => NonRetryableError<DriverErrorType>; | ||
export declare function createGenericNetworkError(errorMessage: string, canRetry: boolean, retryAfterSeconds?: number, statusCode?: number): GenericNetworkError | ThrottlingError; | ||
export declare function createGenericNetworkError(errorMessage: string, canRetry: boolean, retryAfterSeconds?: number, props?: ITaggableTelemetryProperties): GenericNetworkError | ThrottlingError; | ||
/** | ||
@@ -56,0 +62,0 @@ * Check if a connection error can be retried. Unless explicitly allowed, retry is disallowed. |
@@ -5,2 +5,3 @@ /*! | ||
*/ | ||
var _a; | ||
import { DriverErrorType, } from "@fluidframework/driver-definitions"; | ||
@@ -28,4 +29,4 @@ import { LoggingError } from "@fluidframework/telemetry-utils"; | ||
export class GenericNetworkError extends LoggingError { | ||
constructor(errorMessage, canRetry, statusCode) { | ||
super(errorMessage, { statusCode }); | ||
constructor(errorMessage, canRetry, props) { | ||
super(errorMessage, props); | ||
this.canRetry = canRetry; | ||
@@ -35,5 +36,19 @@ this.errorType = DriverErrorType.genericNetworkError; | ||
} | ||
// Todo GH #6214: Remove after next drive def bump. This is necessary as there is no | ||
// compatible way to augment an enum, as it can't be optional. So for now | ||
// we need to duplicate the value here. We likely need to rethink our | ||
// DriverErrorType strategy so that it supports extension with optional | ||
// value. | ||
const deltaStreamConnectionForbiddenStr = "deltaStreamConnectionForbidden"; | ||
export class DeltaStreamConnectionForbiddenError extends LoggingError { | ||
constructor(errorMessage) { | ||
super(errorMessage, { statusCode: 400 }); | ||
this.errorType = DeltaStreamConnectionForbiddenError.errorType; | ||
this.canRetry = false; | ||
} | ||
} | ||
DeltaStreamConnectionForbiddenError.errorType = (_a = DriverErrorType[deltaStreamConnectionForbiddenStr]) !== null && _a !== void 0 ? _a : deltaStreamConnectionForbiddenStr; | ||
export class AuthorizationError extends LoggingError { | ||
constructor(errorMessage, claims, tenantId, statusCode) { | ||
super(errorMessage, { statusCode }); | ||
constructor(errorMessage, claims, tenantId, props) { | ||
super(errorMessage, props); | ||
this.claims = claims; | ||
@@ -68,4 +83,4 @@ this.tenantId = tenantId; | ||
export class ThrottlingError extends LoggingError { | ||
constructor(errorMessage, retryAfterSeconds, statusCode) { | ||
super(errorMessage, { statusCode }); | ||
constructor(errorMessage, retryAfterSeconds, props) { | ||
super(errorMessage, props); | ||
this.retryAfterSeconds = retryAfterSeconds; | ||
@@ -77,7 +92,7 @@ this.errorType = DriverErrorType.throttlingError; | ||
export const createWriteError = (errorMessage) => new NonRetryableError(errorMessage, DriverErrorType.writeError); | ||
export function createGenericNetworkError(errorMessage, canRetry, retryAfterSeconds, statusCode) { | ||
export function createGenericNetworkError(errorMessage, canRetry, retryAfterSeconds, props) { | ||
if (retryAfterSeconds !== undefined && canRetry) { | ||
return new ThrottlingError(errorMessage, retryAfterSeconds, statusCode); | ||
return new ThrottlingError(errorMessage, retryAfterSeconds, props); | ||
} | ||
return new GenericNetworkError(errorMessage, canRetry, statusCode); | ||
return new GenericNetworkError(errorMessage, canRetry, props); | ||
} | ||
@@ -90,4 +105,4 @@ /** | ||
export const canRetryOnError = (error) => (error === null || error === void 0 ? void 0 : error.canRetry) === true; | ||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return | ||
export const getRetryDelayFromError = (error) => error === null || error === void 0 ? void 0 : error.retryAfterSeconds; | ||
export const getRetryDelayFromError = (error) => (error === null || error === void 0 ? void 0 : error.retryAfterSeconds) !== undefined ? | ||
error.retryAfterSeconds * 1000 : undefined; | ||
//# sourceMappingURL=network.js.map |
@@ -8,3 +8,3 @@ /*! | ||
export declare const pkgName = "@fluidframework/driver-utils"; | ||
export declare const pkgVersion = "0.39.3"; | ||
export declare const pkgVersion = "0.40.0-25719"; | ||
//# sourceMappingURL=packageVersion.d.ts.map |
@@ -8,3 +8,3 @@ /*! | ||
export const pkgName = "@fluidframework/driver-utils"; | ||
export const pkgVersion = "0.39.3"; | ||
export const pkgVersion = "0.40.0-25719"; | ||
//# sourceMappingURL=packageVersion.js.map |
@@ -1,2 +0,2 @@ | ||
import { ITelemetryLogger } from "@fluidframework/common-definitions"; | ||
import { ITelemetryLogger, ITelemetryProperties } from "@fluidframework/common-definitions"; | ||
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; | ||
@@ -30,3 +30,3 @@ import { IDeltasFetchResult, IStream, IStreamResult } from "@fluidframework/driver-definitions"; | ||
private readonly knewTo; | ||
constructor(from: number, to: number | undefined, payloadSize: number, logger: ITelemetryLogger, requestCallback: (request: number, from: number, to: number, strongTo: boolean) => Promise<{ | ||
constructor(from: number, to: number | undefined, payloadSize: number, logger: ITelemetryLogger, requestCallback: (request: number, from: number, to: number, strongTo: boolean, props: ITelemetryProperties) => Promise<{ | ||
partial: boolean; | ||
@@ -59,3 +59,3 @@ cancel: boolean; | ||
} | ||
export declare function requestOps(get: (from: number, to: number) => Promise<IDeltasFetchResult>, concurrency: number, from: number, to: number | undefined, payloadSize: number, logger: ITelemetryLogger, signal?: AbortSignal): IStream<ISequencedDocumentMessage[]>; | ||
export declare function requestOps(get: (from: number, to: number, telemetryProps: ITelemetryProperties) => Promise<IDeltasFetchResult>, concurrency: number, fromTotal: number, toTotal: number | undefined, payloadSize: number, logger: ITelemetryLogger, signal?: AbortSignal): IStream<ISequencedDocumentMessage[]>; | ||
export declare const emptyMessageStream: IStream<ISequencedDocumentMessage[]>; | ||
@@ -62,0 +62,0 @@ export declare function streamFromMessages(messagesArg: Promise<ISequencedDocumentMessage[]>): IStream<ISequencedDocumentMessage[]>; |
@@ -5,8 +5,8 @@ /*! | ||
*/ | ||
import { assert, Deferred, performance } from "@fluidframework/common-utils"; | ||
import { PerformanceEvent, TelemetryLogger } from "@fluidframework/telemetry-utils"; | ||
import { assert, Deferred } from "@fluidframework/common-utils"; | ||
import { PerformanceEvent } from "@fluidframework/telemetry-utils"; | ||
import { getRetryDelayFromError, canRetryOnError, createGenericNetworkError } from "./network"; | ||
import { waitForConnectedState } from "./networkUtils"; | ||
const MaxFetchDelaySeconds = 10; | ||
const MissingFetchDelaySeconds = 0.1; | ||
const MaxFetchDelayInMs = 10000; | ||
const MissingFetchDelayInMs = 100; | ||
/** | ||
@@ -134,3 +134,3 @@ * Helper class to organize parallel fetching of data | ||
this.requests++; | ||
const promise = this.requestCallback(this.requests, from, to, this.to !== undefined); | ||
const promise = this.requestCallback(this.requests, from, to, this.to !== undefined, {}); | ||
// dispatch any prior received data | ||
@@ -299,12 +299,10 @@ this.dispatch(); | ||
*/ | ||
async function getSingleOpBatch(get, request, from, to, telemetryEvent, strongTo, signal) { | ||
async function getSingleOpBatch(get, props, strongTo, signal) { | ||
let lastSuccessTime; | ||
let retry = 0; | ||
const deltas = []; | ||
let deltasRetrievedTotal = 0; | ||
const nothing = { partial: false, cancel: true, payload: [] }; | ||
const start = performance.now(); | ||
while ((signal === null || signal === void 0 ? void 0 : signal.aborted) !== true) { | ||
retry++; | ||
let delay = Math.min(MaxFetchDelaySeconds, MissingFetchDelaySeconds * Math.pow(2, retry)); | ||
let delay = Math.min(MaxFetchDelayInMs, MissingFetchDelayInMs * Math.pow(2, retry)); | ||
let canRetry = false; | ||
@@ -314,15 +312,7 @@ try { | ||
canRetry = true; | ||
const deltasP = get(from, to); | ||
const deltasP = get(Object.assign(Object.assign({}, props), { retry })); | ||
const { messages, partialResult } = await deltasP; | ||
deltas.push(...messages); | ||
const deltasRetrievedLast = messages.length; | ||
deltasRetrievedTotal += deltasRetrievedLast; | ||
if (deltasRetrievedLast !== 0 || !strongTo) { | ||
telemetryEvent.reportProgress({ | ||
chunkDeltas: deltasRetrievedTotal, | ||
chunkFrom: from, | ||
chunkTo: to, | ||
chunkRequests: retry, | ||
chunkDuration: TelemetryLogger.formatTick(performance.now() - start), | ||
}); | ||
return { payload: deltas, cancel: false, partial: partialResult }; | ||
@@ -341,12 +331,3 @@ } | ||
// current as it can't get ops. | ||
telemetryEvent.cancel({ | ||
category: "error", | ||
error: "too many retries", | ||
retry, | ||
request, | ||
deltasRetrievedTotal, | ||
replayFrom: from, | ||
to, | ||
}); | ||
throw createGenericNetworkError("Failed to retrieve ops from storage: giving up after too many retries", false /* canRetry */); | ||
throw createGenericNetworkError("Failed to retrieve ops from storage: too many retries", false /* canRetry */, undefined /* retryAfterSeconds */, Object.assign({ retry }, props)); | ||
} | ||
@@ -362,5 +343,3 @@ } | ||
eventName: "GetDeltas_Error", | ||
fetchTo: to, | ||
from, | ||
request, | ||
...props, | ||
retry, | ||
@@ -372,3 +351,2 @@ }, | ||
// It's game over scenario. | ||
telemetryEvent.cancel({ category: "error" }, error); | ||
throw error; | ||
@@ -381,16 +359,7 @@ } | ||
} | ||
/* | ||
if (to !== undefined && this.lastQueuedSequenceNumber >= to) { | ||
// the client caught up while we were trying to fetch ops from storage | ||
// bail out since we no longer need to request these ops | ||
return nothing; | ||
} | ||
*/ | ||
await waitForConnectedState(delay * 1000); | ||
await waitForConnectedState(delay); | ||
} | ||
// Might need to change to non-error event | ||
telemetryEvent.cancel({ error: "container closed" }); | ||
return nothing; | ||
} | ||
export function requestOps(get, concurrency, from, to, payloadSize, logger, signal) { | ||
export function requestOps(get, concurrency, fromTotal, toTotal, payloadSize, logger, signal) { | ||
let requests = 0; | ||
@@ -400,10 +369,10 @@ let lastFetch; | ||
const queue = new Queue(); | ||
const telemetryEvent = PerformanceEvent.start(logger, { | ||
eventName: `GetDeltas`, | ||
from, | ||
to, | ||
}); | ||
const manager = new ParallelRequests(from, to, payloadSize, logger, async (request, _from, _to, strongTo) => { | ||
const propsTotal = { | ||
fromTotal, | ||
toTotal, | ||
}; | ||
const telemetryEvent = PerformanceEvent.start(logger, Object.assign({ eventName: `GetDeltas` }, propsTotal)); | ||
const manager = new ParallelRequests(fromTotal, toTotal, payloadSize, logger, async (request, from, to, strongTo, propsPerRequest) => { | ||
requests++; | ||
return getSingleOpBatch(get, request, _from, _to, telemetryEvent, strongTo, signal); | ||
return getSingleOpBatch(async (propsAll) => get(from, to, propsAll), Object.assign(Object.assign({ request, from, to }, propsTotal), propsPerRequest), strongTo, signal); | ||
}, (deltas) => { | ||
@@ -410,0 +379,0 @@ lastFetch = deltas[deltas.length - 1].sequenceNumber; |
{ | ||
"name": "@fluidframework/driver-utils", | ||
"version": "0.39.3", | ||
"version": "0.40.0-25719", | ||
"description": "Collection of utility functions for Fluid drivers", | ||
@@ -14,3 +14,3 @@ "homepage": "https://fluidframework.com", | ||
"scripts": { | ||
"build": "npm run build:genver && concurrently npm:build:compile npm:lint", | ||
"build": "npm run build:genver && concurrently npm:build:compile npm:lint && npm run build:docs", | ||
"build:commonjs": "npm run tsc && npm run build:test", | ||
@@ -24,2 +24,3 @@ "build:compile": "concurrently npm:build:commonjs npm:build:esnext", | ||
"build:test": "tsc --project ./src/test/tsconfig.json", | ||
"ci:build:docs": "api-extractor run && copyfiles -u 1 ./_api-extractor-temp/doc-models/* ../../../_api-extractor-temp/", | ||
"clean": "rimraf dist lib *.tsbuildinfo *.build.log", | ||
@@ -59,10 +60,10 @@ "eslint": "eslint --format stylish src", | ||
"dependencies": { | ||
"@fluidframework/common-definitions": "^0.20.0", | ||
"@fluidframework/common-utils": "^0.30.0", | ||
"@fluidframework/core-interfaces": "^0.39.3", | ||
"@fluidframework/driver-definitions": "^0.39.3", | ||
"@fluidframework/gitresources": "^0.1024.0", | ||
"@fluidframework/protocol-base": "^0.1024.0", | ||
"@fluidframework/common-definitions": "^0.20.0-0", | ||
"@fluidframework/common-utils": "^0.30.0-0", | ||
"@fluidframework/core-interfaces": "^0.39.0", | ||
"@fluidframework/driver-definitions": "^0.39.0", | ||
"@fluidframework/gitresources": "^0.1025.0-0", | ||
"@fluidframework/protocol-base": "^0.1025.0-0", | ||
"@fluidframework/protocol-definitions": "^0.1024.0", | ||
"@fluidframework/telemetry-utils": "^0.39.3", | ||
"@fluidframework/telemetry-utils": "0.40.0-25719", | ||
"assert": "^2.0.0", | ||
@@ -72,9 +73,9 @@ "uuid": "^8.3.1" | ||
"devDependencies": { | ||
"@fluidframework/build-common": "^0.22.0", | ||
"@fluidframework/build-common": "^0.22.0-0", | ||
"@fluidframework/eslint-config-fluid": "^0.23.0", | ||
"@fluidframework/mocha-test-setup": "^0.39.3", | ||
"@fluidframework/runtime-utils": "^0.39.3", | ||
"@fluidframework/mocha-test-setup": "0.40.0-25719", | ||
"@fluidframework/runtime-utils": "0.40.0-25719", | ||
"@microsoft/api-extractor": "^7.13.1", | ||
"@types/assert": "^1.5.2", | ||
"@types/mocha": "^5.2.5", | ||
"@types/mocha": "^8.2.2", | ||
"@types/node": "^12.19.0", | ||
@@ -93,3 +94,3 @@ "@typescript-eslint/eslint-plugin": "~4.14.0", | ||
"eslint-plugin-unicorn": "~26.0.1", | ||
"mocha": "^8.1.1", | ||
"mocha": "^8.4.0", | ||
"nyc": "^15.0.0", | ||
@@ -96,0 +97,0 @@ "rimraf": "^2.6.2", |
@@ -18,1 +18,2 @@ /*! | ||
export * from "./networkUtils"; | ||
export * from "./runWithRetry"; |
@@ -40,8 +40,25 @@ /*! | ||
readonly canRetry: boolean, | ||
statusCode: number | undefined, | ||
props?: ITaggableTelemetryProperties, | ||
) { | ||
super(errorMessage, { statusCode }); | ||
super(errorMessage, props); | ||
} | ||
} | ||
// Todo GH #6214: Remove after next drive def bump. This is necessary as there is no | ||
// compatible way to augment an enum, as it can't be optional. So for now | ||
// we need to duplicate the value here. We likely need to rethink our | ||
// DriverErrorType strategy so that it supports extension with optional | ||
// value. | ||
const deltaStreamConnectionForbiddenStr = "deltaStreamConnectionForbidden"; | ||
export class DeltaStreamConnectionForbiddenError extends LoggingError { | ||
static readonly errorType: string = | ||
DriverErrorType[deltaStreamConnectionForbiddenStr] ?? deltaStreamConnectionForbiddenStr; | ||
readonly errorType: string = DeltaStreamConnectionForbiddenError.errorType; | ||
readonly canRetry = false; | ||
constructor(errorMessage: string) { | ||
super(errorMessage, { statusCode: 400 }); | ||
} | ||
} | ||
export class AuthorizationError extends LoggingError implements IAuthorizationError { | ||
@@ -55,5 +72,5 @@ readonly errorType = DriverErrorType.authorizationError; | ||
readonly tenantId: string | undefined, | ||
statusCode: number, | ||
props?: ITaggableTelemetryProperties, | ||
) { | ||
super(errorMessage, { statusCode }); | ||
super(errorMessage, props); | ||
} | ||
@@ -103,5 +120,5 @@ } | ||
readonly retryAfterSeconds: number, | ||
statusCode?: number, | ||
props?: ITaggableTelemetryProperties, | ||
) { | ||
super(errorMessage, { statusCode }); | ||
super(errorMessage, props); | ||
} | ||
@@ -117,7 +134,7 @@ } | ||
retryAfterSeconds?: number, | ||
statusCode?: number) { | ||
props?: ITaggableTelemetryProperties) { | ||
if (retryAfterSeconds !== undefined && canRetry) { | ||
return new ThrottlingError(errorMessage, retryAfterSeconds, statusCode); | ||
return new ThrottlingError(errorMessage, retryAfterSeconds, props); | ||
} | ||
return new GenericNetworkError(errorMessage, canRetry, statusCode); | ||
return new GenericNetworkError(errorMessage, canRetry, props); | ||
} | ||
@@ -132,3 +149,3 @@ | ||
// eslint-disable-next-line @typescript-eslint/no-unsafe-return | ||
export const getRetryDelayFromError = (error: any): number | undefined => error?.retryAfterSeconds; | ||
export const getRetryDelayFromError = (error: any): number | undefined => error?.retryAfterSeconds !== undefined ? | ||
error.retryAfterSeconds * 1000 : undefined; |
@@ -9,2 +9,2 @@ /*! | ||
export const pkgName = "@fluidframework/driver-utils"; | ||
export const pkgVersion = "0.39.3"; | ||
export const pkgVersion = "0.40.0-25719"; |
@@ -5,5 +5,5 @@ /*! | ||
*/ | ||
import { assert, Deferred, performance } from "@fluidframework/common-utils"; | ||
import { ITelemetryLogger } from "@fluidframework/common-definitions"; | ||
import { PerformanceEvent, TelemetryLogger } from "@fluidframework/telemetry-utils"; | ||
import { assert, Deferred } from "@fluidframework/common-utils"; | ||
import { ITelemetryLogger, ITelemetryProperties } from "@fluidframework/common-definitions"; | ||
import { PerformanceEvent} from "@fluidframework/telemetry-utils"; | ||
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; | ||
@@ -14,4 +14,4 @@ import { IDeltasFetchResult, IStream, IStreamResult } from "@fluidframework/driver-definitions"; | ||
const MaxFetchDelaySeconds = 10; | ||
const MissingFetchDelaySeconds = 0.1; | ||
const MaxFetchDelayInMs = 10000; | ||
const MissingFetchDelayInMs = 100; | ||
@@ -45,4 +45,8 @@ /** | ||
private readonly logger: ITelemetryLogger, | ||
private readonly requestCallback: (request: number, from: number, to: number, strongTo: boolean) => | ||
Promise<{ partial: boolean, cancel: boolean, payload: T[] }>, | ||
private readonly requestCallback: ( | ||
request: number, | ||
from: number, | ||
to: number, | ||
strongTo: boolean, | ||
props: ITelemetryProperties) => Promise<{ partial: boolean, cancel: boolean, payload: T[] }>, | ||
private readonly responseCallback: (payload: T[]) => void) | ||
@@ -168,3 +172,3 @@ { | ||
const promise = this.requestCallback(this.requests, from, to, this.to !== undefined); | ||
const promise = this.requestCallback(this.requests, from, to, this.to !== undefined, {}); | ||
@@ -348,7 +352,4 @@ // dispatch any prior received data | ||
async function getSingleOpBatch( | ||
get: (from: number, to: number) => Promise<IDeltasFetchResult>, | ||
request: number, | ||
from: number, | ||
to: number, | ||
telemetryEvent: PerformanceEvent, | ||
get: (telemetryProps: ITelemetryProperties) => Promise<IDeltasFetchResult>, | ||
props: ITelemetryProperties, | ||
strongTo: boolean, | ||
@@ -362,10 +363,7 @@ signal?: AbortSignal): | ||
const deltas: ISequencedDocumentMessage[] = []; | ||
let deltasRetrievedTotal = 0; | ||
const nothing = { partial: false, cancel: true, payload: []}; | ||
const start = performance.now(); | ||
while (signal?.aborted !== true) { | ||
retry++; | ||
let delay = Math.min(MaxFetchDelaySeconds, MissingFetchDelaySeconds * Math.pow(2, retry)); | ||
let delay = Math.min(MaxFetchDelayInMs, MissingFetchDelayInMs * Math.pow(2, retry)); | ||
let canRetry = false; | ||
@@ -376,3 +374,3 @@ | ||
canRetry = true; | ||
const deltasP = get(from, to); | ||
const deltasP = get({ ...props, retry } /* telemetry props */); | ||
@@ -383,12 +381,4 @@ const { messages, partialResult } = await deltasP; | ||
const deltasRetrievedLast = messages.length; | ||
deltasRetrievedTotal += deltasRetrievedLast; | ||
if (deltasRetrievedLast !== 0 || !strongTo) { | ||
telemetryEvent.reportProgress({ | ||
chunkDeltas: deltasRetrievedTotal, | ||
chunkFrom: from, | ||
chunkTo: to, | ||
chunkRequests: retry, | ||
chunkDuration: TelemetryLogger.formatTick(performance.now() - start), | ||
}); | ||
return { payload: deltas, cancel: false, partial: partialResult}; | ||
@@ -408,14 +398,10 @@ } | ||
// current as it can't get ops. | ||
telemetryEvent.cancel({ | ||
category: "error", | ||
error: "too many retries", | ||
retry, | ||
request, | ||
deltasRetrievedTotal, | ||
replayFrom: from, | ||
to, | ||
}); | ||
throw createGenericNetworkError( | ||
"Failed to retrieve ops from storage: giving up after too many retries", | ||
"Failed to retrieve ops from storage: too many retries", | ||
false /* canRetry */, | ||
undefined /* retryAfterSeconds */, | ||
{ | ||
retry, | ||
...props, | ||
}, | ||
); | ||
@@ -433,5 +419,3 @@ } | ||
eventName: "GetDeltas_Error", | ||
fetchTo: to, | ||
from, | ||
request, | ||
...props, | ||
retry, | ||
@@ -444,3 +428,2 @@ }, | ||
// It's game over scenario. | ||
telemetryEvent.cancel({ category: "error" }, error); | ||
throw error; | ||
@@ -455,15 +438,5 @@ } | ||
/* | ||
if (to !== undefined && this.lastQueuedSequenceNumber >= to) { | ||
// the client caught up while we were trying to fetch ops from storage | ||
// bail out since we no longer need to request these ops | ||
return nothing; | ||
} | ||
*/ | ||
await waitForConnectedState(delay * 1000); | ||
await waitForConnectedState(delay); | ||
} | ||
// Might need to change to non-error event | ||
telemetryEvent.cancel({ error: "container closed" }); | ||
return nothing; | ||
@@ -473,6 +446,6 @@ } | ||
export function requestOps( | ||
get: (from: number, to: number) => Promise<IDeltasFetchResult>, | ||
get: (from: number, to: number, telemetryProps: ITelemetryProperties) => Promise<IDeltasFetchResult>, | ||
concurrency: number, | ||
from: number, | ||
to: number | undefined, | ||
fromTotal: number, | ||
toTotal: number | undefined, | ||
payloadSize: number, | ||
@@ -487,16 +460,25 @@ logger: ITelemetryLogger, | ||
const propsTotal: ITelemetryProperties = { | ||
fromTotal, | ||
toTotal, | ||
}; | ||
const telemetryEvent = PerformanceEvent.start(logger, { | ||
eventName: `GetDeltas`, | ||
from, | ||
to, | ||
...propsTotal, | ||
}); | ||
const manager = new ParallelRequests<ISequencedDocumentMessage>( | ||
from, | ||
to, | ||
fromTotal, | ||
toTotal, | ||
payloadSize, | ||
logger, | ||
async (request: number, _from: number, _to: number, strongTo: boolean) => { | ||
async (request: number, from: number, to: number, strongTo: boolean, propsPerRequest: ITelemetryProperties) => { | ||
requests++; | ||
return getSingleOpBatch(get, request, _from, _to, telemetryEvent, strongTo, signal); | ||
return getSingleOpBatch( | ||
async (propsAll) => get(from, to, propsAll), | ||
{ request, from, to, ...propsTotal, ...propsPerRequest }, | ||
strongTo, | ||
signal, | ||
); | ||
}, | ||
@@ -503,0 +485,0 @@ (deltas: ISequencedDocumentMessage[]) => { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
456454
142
4931
+ Added@fluidframework/gitresources@0.1025.1(transitive)
+ Added@fluidframework/protocol-base@0.1025.1(transitive)
+ Added@fluidframework/telemetry-utils@0.40.0-25719(transitive)
- Removed@fluidframework/gitresources@0.1024.1(transitive)
- Removed@fluidframework/protocol-base@0.1024.1(transitive)
- Removed@fluidframework/telemetry-utils@0.39.8(transitive)