Socket
Socket
Sign inDemoInstall

google-gax

Package Overview
Dependencies
88
Maintainers
4
Versions
355
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 4.0.6-experimental to 4.1.0

build/src/streamingRetryRequest.d.ts

1

build/src/clientInterface.d.ts

@@ -13,2 +13,3 @@ import { GrpcClientOptions, ClientStubOptions } from './grpc';

apiEndpoint?: string;
gaxServerStreamingRetries?: boolean;
}

@@ -15,0 +16,0 @@ export interface Descriptors {

42

build/src/createApiCall.js

@@ -23,4 +23,7 @@ "use strict";

const apiCaller_1 = require("./apiCaller");
const gax_1 = require("./gax");
const retries_1 = require("./normalCalls/retries");
const timeout_1 = require("./normalCalls/timeout");
const streamingApiCaller_1 = require("./streamingCalls/streamingApiCaller");
const warnings_1 = require("./warnings");
/**

@@ -56,4 +59,14 @@ * Converts an rpc call into an API call governed by the settings.

return (request, callOptions, callback) => {
const thisSettings = settings.merge(callOptions);
var _a, _b;
let currentApiCaller = apiCaller;
let thisSettings;
if (currentApiCaller instanceof streamingApiCaller_1.StreamingApiCaller) {
const gaxStreamingRetries = (_b = (_a = currentApiCaller.descriptor) === null || _a === void 0 ? void 0 : _a.gaxStreamingRetries) !== null && _b !== void 0 ? _b : false;
// If Gax streaming retries are enabled, check settings passed at call time and convert parameters if needed
const convertedRetryOptions = (0, gax_1.convertRetryOptions)(callOptions, gaxStreamingRetries);
thisSettings = settings.merge(convertedRetryOptions);
}
else {
thisSettings = settings.merge(callOptions);
}
// special case: if bundling is disabled for this one call,

@@ -67,3 +80,4 @@ // use default API caller instead

.then((func) => {
var _a;
var _a, _b;
var _c;
// Initially, the function is just what gRPC server stub contains.

@@ -73,11 +87,21 @@ func = currentApiCaller.wrap(func);

const retry = thisSettings.retry;
if (!streaming &&
if (streaming &&
retry &&
retry.retryCodes &&
retry.retryCodes.length > 0) {
retry.backoffSettings.initialRpcTimeoutMillis =
retry.backoffSettings.initialRpcTimeoutMillis ||
thisSettings.timeout;
return (0, retries_1.retryable)(func, thisSettings.retry, thisSettings.otherArgs, thisSettings.apiName);
retry.retryCodes.length > 0 &&
retry.shouldRetryFn) {
(0, warnings_1.warn)('either_retrycodes_or_shouldretryfn', 'Only one of retryCodes or shouldRetryFn may be defined. Ignoring retryCodes.');
retry.retryCodes = [];
}
if (!streaming && retry) {
if (retry.shouldRetryFn) {
throw new Error('Using a function to determine retry eligibility is only supported with server streaming calls');
}
if (retry.getResumptionRequestFn) {
throw new Error('Resumption strategy can only be used with server streaming retries');
}
if (retry.retryCodes && retry.retryCodes.length > 0) {
(_b = (_c = retry.backoffSettings).initialRpcTimeoutMillis) !== null && _b !== void 0 ? _b : (_c.initialRpcTimeoutMillis = thisSettings.timeout);
return (0, retries_1.retryable)(func, thisSettings.retry, thisSettings.otherArgs, thisSettings.apiName);
}
}
return (0, timeout_1.addTimeoutArg)(func, thisSettings.timeout, thisSettings.otherArgs);

@@ -84,0 +108,0 @@ })

@@ -20,3 +20,5 @@ /**

import type { Message } from 'protobufjs';
import { GoogleError } from './googleError';
import { BundleOptions } from './bundlingCalls/bundleExecutor';
import { RequestType } from './apitypes';
/**

@@ -65,5 +67,8 @@ * Encapsulates the overridable settings for a particular API call.

* Per-call configurable settings for retrying upon transient failure.
* @implements {RetryOptionsType}
* @typedef {Object} RetryOptions
* @property {String[]} retryCodes
* @property {number[]} retryCodes
* @property {BackoffSettings} backoffSettings
* @property {(function)} shouldRetryFn
* @property {(function)} getResumptionRequestFn
*/

@@ -73,4 +78,21 @@ export declare class RetryOptions {

backoffSettings: BackoffSettings;
constructor(retryCodes: number[], backoffSettings: BackoffSettings);
shouldRetryFn?: (error: GoogleError) => boolean;
getResumptionRequestFn?: (request: RequestType) => RequestType;
constructor(retryCodes: number[], backoffSettings: BackoffSettings, shouldRetryFn?: (error: GoogleError) => boolean, getResumptionRequestFn?: (request: RequestType) => RequestType);
}
/**
* Per-call configurable settings for working with retry-request
* See the repo README for more about the parameters
* https://github.com/googleapis/retry-request
* Will be deprecated in a future release. Only relevant to server streaming calls
* @typedef {Object} RetryOptions
* @property {boolean} objectMode - when true utilizes object mode in streams
* @property {request} request - the request to retry
* @property {number} noResponseRetries - number of times to retry on no response
* @property {number} currentRetryAttempt - what # retry attempt retry-request is on
* @property {Function} shouldRetryFn - determines whether to retry, returns a boolean
* @property {number} maxRetryDelay - maximum retry delay in seconds
* @property {number} retryDelayMultiplier - multiplier to increase the delay in between completion of failed requests
* @property {number} totalTimeout - total timeout in seconds
*/
export interface RetryRequestOptions {

@@ -82,3 +104,6 @@ objectMode?: boolean;

currentRetryAttempt?: number;
shouldRetryFn?: () => boolean;
shouldRetryFn?: (error: GoogleError) => boolean;
maxRetryDelay?: number;
retryDelayMultiplier?: number;
totalTimeout?: number;
}

@@ -178,12 +203,23 @@ /**

/**
* Validates passed retry options in preparation for eventual parameter deprecation
* converts retryRequestOptions to retryOptions
* then sets retryRequestOptions to null
*
* @param {CallOptions} options - a list of passed retry option
* @return {CallOptions} A new CallOptions object.
*
*/
export declare function convertRetryOptions(options?: CallOptions, gaxStreamingRetries?: boolean): CallOptions | undefined;
/**
* Per-call configurable settings for retrying upon transient failure.
*
* @param {number[]} retryCodes - a list of Google API canonical error codes
* @param {number[]} retryCodes - a list of Google API canonical error codes OR a function that returns a boolean to determine retry behavior
* upon which a retry should be attempted.
* @param {BackoffSettings} backoffSettings - configures the retry
* exponential backoff algorithm.
* @param {function} shouldRetryFn - a function that determines whether a call should retry. If this is defined retryCodes must be empty
* @param {function} getResumptionRequestFn - a function with a resumption strategy - only used with server streaming retries
* @return {RetryOptions} A new RetryOptions object.
*
*/
export declare function createRetryOptions(retryCodes: number[], backoffSettings: BackoffSettings): RetryOptions;
export declare function createRetryOptions(retryCodes: number[], backoffSettings: BackoffSettings, shouldRetryFn?: (error: GoogleError) => boolean, getResumptionRequestFn?: (request: RequestType) => RequestType): RetryOptions;
/**

@@ -190,0 +226,0 @@ * Parameters to the exponential backoff algorithm for retrying.

@@ -18,5 +18,6 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.createByteLengthFunction = exports.constructSettings = exports.createBundleOptions = exports.createMaxRetriesBackoffSettings = exports.createDefaultBackoffSettings = exports.createBackoffSettings = exports.createRetryOptions = exports.CallSettings = exports.RetryOptions = void 0;
exports.createByteLengthFunction = exports.constructSettings = exports.createBundleOptions = exports.createMaxRetriesBackoffSettings = exports.createDefaultBackoffSettings = exports.createBackoffSettings = exports.createRetryOptions = exports.convertRetryOptions = exports.CallSettings = exports.RetryOptions = void 0;
const warnings_1 = require("./warnings");
const util_1 = require("./util");
const status_1 = require("./status");
/**

@@ -65,10 +66,15 @@ * Encapsulates the overridable settings for a particular API call.

* Per-call configurable settings for retrying upon transient failure.
* @implements {RetryOptionsType}
* @typedef {Object} RetryOptions
* @property {String[]} retryCodes
* @property {number[]} retryCodes
* @property {BackoffSettings} backoffSettings
* @property {(function)} shouldRetryFn
* @property {(function)} getResumptionRequestFn
*/
class RetryOptions {
constructor(retryCodes, backoffSettings) {
constructor(retryCodes, backoffSettings, shouldRetryFn, getResumptionRequestFn) {
this.retryCodes = retryCodes;
this.backoffSettings = backoffSettings;
this.shouldRetryFn = shouldRetryFn;
this.getResumptionRequestFn = getResumptionRequestFn;
}

@@ -133,9 +139,11 @@ }

let retryRequestOptions = this.retryRequestOptions;
// If the user provides a timeout to the method, that timeout value will be used
// to override the backoff settings.
if ('timeout' in options) {
timeout = options.timeout;
}
// If a method-specific timeout is set in the service config, and the retry codes for that
// method are non-null, then that timeout value will be used to
// override backoff settings.
if (retry !== undefined &&
retry !== null &&
retry.retryCodes !== null &&
retry.retryCodes.length > 0) {
if (retry === null || retry === void 0 ? void 0 : retry.retryCodes) {
retry.backoffSettings.initialRpcTimeoutMillis = timeout;

@@ -145,14 +153,2 @@ retry.backoffSettings.maxRpcTimeoutMillis = timeout;

}
// If the user provides a timeout to the method, that timeout value will be used
// to override the backoff settings.
if ('timeout' in options) {
timeout = options.timeout;
if (retry !== undefined &&
retry !== null &&
retry.retryCodes.length > 0) {
retry.backoffSettings.initialRpcTimeoutMillis = timeout;
retry.backoffSettings.maxRpcTimeoutMillis = timeout;
retry.backoffSettings.totalTimeoutMillis = timeout;
}
}
if ('retry' in options) {

@@ -179,3 +175,3 @@ retry = mergeRetryOptions(retry || {}, options.retry);

}
if ('maxRetries' in options) {
if ('maxRetries' in options && options.maxRetries !== undefined) {
retry.backoffSettings.maxRetries = options.maxRetries;

@@ -209,15 +205,95 @@ delete retry.backoffSettings.totalTimeoutMillis;

/**
* Validates passed retry options in preparation for eventual parameter deprecation
* converts retryRequestOptions to retryOptions
* then sets retryRequestOptions to null
*
* @param {CallOptions} options - a list of passed retry option
* @return {CallOptions} A new CallOptions object.
*
*/
function convertRetryOptions(options, gaxStreamingRetries) {
var _a, _b, _c, _d;
// options will be undefined if no CallOptions object is passed at call time
if (!options) {
return options;
}
// if a user provided retry AND retryRequestOptions at call time, throw an error
// otherwise, convert supported parameters
if (!gaxStreamingRetries) {
if (options.retry) {
(0, warnings_1.warn)('legacy_streaming_retry_behavior', 'Legacy streaming retry behavior will not honor settings passed at call time or via client configuration. Please set gaxStreamingRetries to true to utilize passed retry settings. gaxStreamingRetries behavior will be set to true by default in future releases.', 'DeprecationWarning');
}
if (options.retryRequestOptions) {
(0, warnings_1.warn)('legacy_streaming_retry_request_behavior', 'Legacy streaming retry behavior will not honor retryRequestOptions passed at call time. Please set gaxStreamingRetries to true to utilize passed retry settings. gaxStreamingRetries behavior will convert retryRequestOptions to retry parameters by default in future releases.', 'DeprecationWarning');
}
return options;
}
if (options.retry && options.retryRequestOptions) {
throw new Error('Only one of retry or retryRequestOptions may be set');
} // handles parameter conversion from retryRequestOptions to retryOptions
if (options.retryRequestOptions) {
if (options.retryRequestOptions.objectMode !== undefined) {
(0, warnings_1.warn)('retry_request_options', 'objectMode override is not supported. It is set to true internally by default in gax.', 'UnsupportedParameterWarning');
}
if (options.retryRequestOptions.noResponseRetries !== undefined) {
(0, warnings_1.warn)('retry_request_options', 'noResponseRetries override is not supported. Please specify retry codes or a function to determine retry eligibility.', 'UnsupportedParameterWarning');
}
if (options.retryRequestOptions.currentRetryAttempt !== undefined) {
(0, warnings_1.warn)('retry_request_options', 'currentRetryAttempt override is not supported. Retry attempts are tracked internally.', 'UnsupportedParameterWarning');
}
let retryCodes = [status_1.Status.UNAVAILABLE];
let shouldRetryFn;
if (options.retryRequestOptions.shouldRetryFn) {
retryCodes = [];
shouldRetryFn = options.retryRequestOptions.shouldRetryFn;
}
//Backoff settings
options.maxRetries =
(_b = (_a = options === null || options === void 0 ? void 0 : options.retryRequestOptions) === null || _a === void 0 ? void 0 : _a.retries) !== null && _b !== void 0 ? _b : options.maxRetries;
// create a default backoff settings object in case the user didn't provide overrides for everything
const backoffSettings = createDefaultBackoffSettings();
let maxRetryDelayMillis;
let totalTimeoutMillis;
// maxRetryDelay - this is in seconds, need to convert to milliseconds
if (options.retryRequestOptions.maxRetryDelay !== undefined) {
maxRetryDelayMillis = options.retryRequestOptions.maxRetryDelay * 1000;
}
// retryDelayMultiplier - should be a one to one mapping to retryDelayMultiplier
const retryDelayMultiplier = (_d = (_c = options === null || options === void 0 ? void 0 : options.retryRequestOptions) === null || _c === void 0 ? void 0 : _c.retryDelayMultiplier) !== null && _d !== void 0 ? _d : backoffSettings.retryDelayMultiplier;
// this is in seconds and needs to be converted to milliseconds and the totalTimeoutMillis parameter
if (options.retryRequestOptions.totalTimeout !== undefined) {
totalTimeoutMillis = options.retryRequestOptions.totalTimeout * 1000;
}
// for the variables the user wants to override, override in the backoff settings object we made
backoffSettings.maxRetryDelayMillis =
maxRetryDelayMillis !== null && maxRetryDelayMillis !== void 0 ? maxRetryDelayMillis : backoffSettings.maxRetryDelayMillis;
backoffSettings.retryDelayMultiplier =
retryDelayMultiplier !== null && retryDelayMultiplier !== void 0 ? retryDelayMultiplier : backoffSettings.retryDelayMultiplier;
backoffSettings.totalTimeoutMillis =
totalTimeoutMillis !== null && totalTimeoutMillis !== void 0 ? totalTimeoutMillis : backoffSettings.totalTimeoutMillis;
const convertedRetryOptions = createRetryOptions(retryCodes, backoffSettings, shouldRetryFn);
options.retry = convertedRetryOptions;
delete options.retryRequestOptions; // completely remove them to avoid any further confusion
(0, warnings_1.warn)('retry_request_options', 'retryRequestOptions will be deprecated in a future release. Please use retryOptions to pass retry options at call time', 'DeprecationWarning');
}
return options;
}
exports.convertRetryOptions = convertRetryOptions;
/**
* Per-call configurable settings for retrying upon transient failure.
*
* @param {number[]} retryCodes - a list of Google API canonical error codes
* @param {number[]} retryCodes - a list of Google API canonical error codes OR a function that returns a boolean to determine retry behavior
* upon which a retry should be attempted.
* @param {BackoffSettings} backoffSettings - configures the retry
* exponential backoff algorithm.
* @param {function} shouldRetryFn - a function that determines whether a call should retry. If this is defined retryCodes must be empty
* @param {function} getResumptionRequestFn - a function with a resumption strategy - only used with server streaming retries
* @return {RetryOptions} A new RetryOptions object.
*
*/
function createRetryOptions(retryCodes, backoffSettings) {
function createRetryOptions(retryCodes, backoffSettings, shouldRetryFn, getResumptionRequestFn) {
return {
retryCodes,
backoffSettings,
shouldRetryFn,
getResumptionRequestFn,
};

@@ -367,3 +443,3 @@ }

}
let codes = null;
let codes = null; // this is one instance where it will NOT be an array OR a function because we do not allow shouldRetryFn in the client
if (retryCodes && 'retry_codes_name' in methodConfig) {

@@ -399,10 +475,21 @@ const retryCodesName = methodConfig['retry_codes_name'];

}
if (!overrides.retryCodes && !overrides.backoffSettings) {
if (!overrides.retryCodes &&
!overrides.backoffSettings &&
!overrides.shouldRetryFn &&
!overrides.getResumptionRequestFn) {
return retry;
}
const codes = overrides.retryCodes ? overrides.retryCodes : retry.retryCodes;
const retryCodes = overrides.retryCodes
? overrides.retryCodes
: retry.retryCodes;
const backoffSettings = overrides.backoffSettings
? overrides.backoffSettings
: retry.backoffSettings;
return createRetryOptions(codes, backoffSettings);
const shouldRetryFn = overrides.shouldRetryFn
? overrides.shouldRetryFn
: retry.shouldRetryFn;
const getResumptionRequestFn = overrides.getResumptionRequestFn
? overrides.getResumptionRequestFn
: retry.getResumptionRequestFn;
return createRetryOptions(retryCodes, backoffSettings, shouldRetryFn, getResumptionRequestFn);
}

@@ -409,0 +496,0 @@ /**

@@ -86,3 +86,4 @@ "use strict";

canceller = null;
if (retry.retryCodes.indexOf(err.code) < 0) {
if (retry.retryCodes.length > 0 &&
retry.retryCodes.indexOf(err.code) < 0) {
err.note =

@@ -89,0 +90,0 @@ 'Exception occurred in retry method that was ' +

@@ -38,3 +38,2 @@ [

"google/api/monitoring.proto",
"google/api/policy.proto",
"google/api/quota.proto",

@@ -100,37 +99,4 @@ "google/api/resource.proto",

"google/protobuf/compiler/ruby/ruby_generated_pkg_implicit.proto",
"google/protobuf/cpp_features.proto",
"google/protobuf/descriptor.proto",
"google/protobuf/duration.proto",
"google/protobuf/editions/codegen_tests/proto2_enum.proto",
"google/protobuf/editions/codegen_tests/proto2_group.proto",
"google/protobuf/editions/codegen_tests/proto2_import.proto",
"google/protobuf/editions/codegen_tests/proto2_optional.proto",
"google/protobuf/editions/codegen_tests/proto2_packed.proto",
"google/protobuf/editions/codegen_tests/proto2_proto3_enum.proto",
"google/protobuf/editions/codegen_tests/proto2_required.proto",
"google/protobuf/editions/codegen_tests/proto2_unpacked.proto",
"google/protobuf/editions/codegen_tests/proto2_utf8_disabled.proto",
"google/protobuf/editions/codegen_tests/proto2_utf8_lite.proto",
"google/protobuf/editions/codegen_tests/proto2_utf8_verify.proto",
"google/protobuf/editions/codegen_tests/proto3_enum.proto",
"google/protobuf/editions/codegen_tests/proto3_implicit.proto",
"google/protobuf/editions/codegen_tests/proto3_import.proto",
"google/protobuf/editions/codegen_tests/proto3_optional.proto",
"google/protobuf/editions/codegen_tests/proto3_packed.proto",
"google/protobuf/editions/codegen_tests/proto3_unpacked.proto",
"google/protobuf/editions/codegen_tests/proto3_utf8_disabled.proto",
"google/protobuf/editions/codegen_tests/proto3_utf8_strict.proto",
"google/protobuf/editions/golden/editions_transform_proto2.proto",
"google/protobuf/editions/golden/editions_transform_proto2_lite.proto",
"google/protobuf/editions/golden/editions_transform_proto2_utf8_disabled.proto",
"google/protobuf/editions/golden/editions_transform_proto3.proto",
"google/protobuf/editions/golden/editions_transform_proto3_utf8_disabled.proto",
"google/protobuf/editions/golden/simple_proto2.proto",
"google/protobuf/editions/golden/simple_proto2_import.proto",
"google/protobuf/editions/golden/simple_proto3.proto",
"google/protobuf/editions/proto/editions_transform_proto2.proto",
"google/protobuf/editions/proto/editions_transform_proto2_lite.proto",
"google/protobuf/editions/proto/editions_transform_proto2_utf8_disabled.proto",
"google/protobuf/editions/proto/editions_transform_proto3.proto",
"google/protobuf/editions/proto/editions_transform_proto3_utf8_disabled.proto",
"google/protobuf/empty.proto",

@@ -137,0 +103,0 @@ "google/protobuf/field_mask.proto",

@@ -18,3 +18,2 @@ /**

import { Descriptor } from '../descriptor';
import { CallSettings } from '../gax';
import { StreamType } from './streaming';

@@ -28,4 +27,5 @@ /**

rest?: boolean;
constructor(streamType: StreamType, rest?: boolean);
getApiCaller(settings: CallSettings): APICaller;
gaxStreamingRetries?: boolean;
constructor(streamType: StreamType, rest?: boolean, gaxStreamingRetries?: boolean);
getApiCaller(): APICaller;
}

@@ -24,13 +24,12 @@ "use strict";

class StreamDescriptor {
constructor(streamType, rest) {
constructor(streamType, rest, gaxStreamingRetries) {
this.type = streamType;
this.streaming = true;
this.rest = rest;
this.gaxStreamingRetries = gaxStreamingRetries;
}
getApiCaller(settings) {
getApiCaller() {
// Right now retrying does not work with gRPC-streaming, because retryable
// assumes an API call returns an event emitter while gRPC-streaming methods
// return Stream.
// TODO: support retrying.
settings.retry = null;
return new streamingApiCaller_1.StreamingApiCaller(this);

@@ -37,0 +36,0 @@ }

@@ -19,3 +19,4 @@ /**

import { APICallback, CancellableStream, GRPCCallResult, SimpleCallbackFunction } from '../apitypes';
import { RetryRequestOptions } from '../gax';
import { RetryOptions, RetryRequestOptions } from '../gax';
import { GoogleError } from '../googleError';
declare const duplexify: DuplexifyConstructor;

@@ -55,2 +56,7 @@ export interface DuplexifyOptions extends DuplexOptions {

rest?: boolean;
gaxServerStreamingRetries?: boolean;
apiCall?: SimpleCallbackFunction;
argument?: {};
prevDeadline?: number;
retries?: number;
/**

@@ -64,16 +70,62 @@ * StreamProxy is a proxy to gRPC-streaming method.

*/
constructor(type: StreamType, callback: APICallback, rest?: boolean);
constructor(type: StreamType, callback: APICallback, rest?: boolean, gaxServerStreamingRetries?: boolean);
cancel(): void;
retry(stream: CancellableStream, retry: RetryOptions): CancellableStream;
/**
* Helper function to handle total timeout + max retry check for server streaming retries
* @param {number} deadline - the current retry deadline
* @param {number} maxRetries - maximum total number of retries
* @param {number} totalTimeoutMillis - total timeout in milliseconds
*/
throwIfMaxRetriesOrTotalTimeoutExceeded(deadline: number, maxRetries: number, totalTimeoutMillis: number): void;
/**
* Error handler for server streaming retries
* @param {CancellableStream} stream - the stream being retried
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
* @param {Error} error - error to handle
*/
streamHandoffErrorHandler(stream: CancellableStream, retry: RetryOptions, error: Error): void;
/**
* Used during server streaming retries to handle
* event forwarding, errors, and/or stream closure
* @param {CancellableStream} stream - the stream that we're doing the retry on
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/
streamHandoffHelper(stream: CancellableStream, retry: RetryOptions): void;
/**
* Forward events from an API request stream to the user's stream.
* @param {Stream} stream - The API request stream.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/
forwardEvents(stream: Stream): void;
defaultShouldRetry(error: GoogleError, retry: RetryOptions): boolean;
/**
* Forward events from an API request stream to the user's stream.
* @param {Stream} stream - The API request stream.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function eshould retry, and the parameters to the exponential backoff retry
* algorithm.
*/
forwardEventsWithRetries(stream: CancellableStream, retry: RetryOptions): CancellableStream | undefined;
/**
* Resets the target stream as part of the retry process
* @param {CancellableStream} requestStream - the stream to end
*/
resetStreams(requestStream: CancellableStream): void;
/**
* Specifies the target stream.
* @param {ApiCall} apiCall - the API function to be called.
* @param {Object} argument - the argument to be passed to the apiCall.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/
setStream(apiCall: SimpleCallbackFunction, argument: {}, retryRequestOptions?: RetryRequestOptions): void;
setStream(apiCall: SimpleCallbackFunction, argument: {}, retryRequestOptions: RetryRequestOptions | undefined, retry: RetryOptions): void;
}
export {};

@@ -20,2 +20,4 @@ "use strict";

const googleError_1 = require("../googleError");
const streamingRetryRequest_1 = require("../streamingRetryRequest");
const status_1 = require("../status");
// eslint-disable-next-line @typescript-eslint/no-var-requires

@@ -47,3 +49,3 @@ const duplexify = require('duplexify');

*/
constructor(type, callback, rest) {
constructor(type, callback, rest, gaxServerStreamingRetries) {
super(undefined, undefined, {

@@ -54,2 +56,3 @@ objectMode: true,

});
this.retries = 0;
this.type = type;

@@ -60,2 +63,3 @@ this._callback = callback;

this.rest = rest;
this.gaxServerStreamingRetries = gaxServerStreamingRetries;
}

@@ -70,5 +74,137 @@ cancel() {

}
retry(stream, retry) {
let retryArgument = this.argument;
if (typeof retry.getResumptionRequestFn === 'function') {
const resumptionRetryArgument = retry.getResumptionRequestFn(retryArgument);
if (resumptionRetryArgument !== undefined) {
retryArgument = retry.getResumptionRequestFn(retryArgument);
}
}
this.resetStreams(stream);
const newStream = this.apiCall(retryArgument, this._callback);
this.stream = newStream;
this.streamHandoffHelper(newStream, retry);
return newStream;
}
/**
* Helper function to handle total timeout + max retry check for server streaming retries
* @param {number} deadline - the current retry deadline
* @param {number} maxRetries - maximum total number of retries
* @param {number} totalTimeoutMillis - total timeout in milliseconds
*/
throwIfMaxRetriesOrTotalTimeoutExceeded(deadline, maxRetries, totalTimeoutMillis) {
const now = new Date();
if (this.prevDeadline !== undefined &&
deadline &&
now.getTime() >= this.prevDeadline) {
const error = new googleError_1.GoogleError(`Total timeout of API exceeded ${totalTimeoutMillis} milliseconds before any response was received.`);
error.code = status_1.Status.DEADLINE_EXCEEDED;
this.emit('error', error);
this.destroy();
// Without throwing error you get unhandled error since we are returning a new stream
// There might be a better way to do this
throw error;
}
if (this.retries && this.retries >= maxRetries) {
const error = new googleError_1.GoogleError('Exceeded maximum number of retries before any ' +
'response was received');
error.code = status_1.Status.DEADLINE_EXCEEDED;
this.emit('error', error);
this.destroy();
throw error;
}
}
/**
* Error handler for server streaming retries
* @param {CancellableStream} stream - the stream being retried
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
* @param {Error} error - error to handle
*/
streamHandoffErrorHandler(stream, retry, error) {
let retryStream = this.stream;
const delayMult = retry.backoffSettings.retryDelayMultiplier;
const maxDelay = retry.backoffSettings.maxRetryDelayMillis;
const timeoutMult = retry.backoffSettings.rpcTimeoutMultiplier;
const maxTimeout = retry.backoffSettings.maxRpcTimeoutMillis;
let delay = retry.backoffSettings.initialRetryDelayMillis;
let timeout = retry.backoffSettings.initialRpcTimeoutMillis;
let now = new Date();
let deadline = 0;
if (retry.backoffSettings.totalTimeoutMillis) {
deadline = now.getTime() + retry.backoffSettings.totalTimeoutMillis;
}
const maxRetries = retry.backoffSettings.maxRetries;
try {
this.throwIfMaxRetriesOrTotalTimeoutExceeded(deadline, maxRetries, retry.backoffSettings.totalTimeoutMillis);
}
catch (error) {
return;
}
this.retries++;
const e = googleError_1.GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e);
}
if (shouldRetry) {
const toSleep = Math.random() * delay;
setTimeout(() => {
now = new Date();
delay = Math.min(delay * delayMult, maxDelay);
const timeoutCal = timeout && timeoutMult ? timeout * timeoutMult : 0;
const rpcTimeout = maxTimeout ? maxTimeout : 0;
this.prevDeadline = deadline;
const newDeadline = deadline ? deadline - now.getTime() : 0;
timeout = Math.min(timeoutCal, rpcTimeout, newDeadline);
}, toSleep);
}
else {
e.note =
'Exception occurred in retry method that was ' +
'not classified as transient';
// for some reason this error must be emitted here
// instead of the destroy, otherwise the error event
// is swallowed
this.emit('error', e);
this.destroy();
return;
}
retryStream = this.retry(stream, retry);
this.stream = retryStream;
return;
}
/**
* Used during server streaming retries to handle
* event forwarding, errors, and/or stream closure
* @param {CancellableStream} stream - the stream that we're doing the retry on
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/
streamHandoffHelper(stream, retry) {
let enteredError = false;
const eventsToForward = ['metadata', 'response', 'status', 'data'];
eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
});
stream.on('error', error => {
enteredError = true;
this.streamHandoffErrorHandler(stream, retry, error);
});
stream.on('end', () => {
if (!enteredError) {
enteredError = true;
this.emit('end');
this.cancel();
}
});
}
/**
* Forward events from an API request stream to the user's stream.
* @param {Stream} stream - The API request stream.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/

@@ -113,8 +249,118 @@ forwardEvents(stream) {

}
defaultShouldRetry(error, retry) {
if (retry.retryCodes.length > 0 &&
retry.retryCodes.indexOf(error.code) < 0) {
return false;
}
return true;
}
/**
* Forward events from an API request stream to the user's stream.
* @param {Stream} stream - The API request stream.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function eshould retry, and the parameters to the exponential backoff retry
* algorithm.
*/
forwardEventsWithRetries(stream, retry) {
let retryStream = this.stream;
const eventsToForward = ['metadata', 'response', 'status'];
eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
});
// gRPC is guaranteed emit the 'status' event but not 'metadata', and 'status' is the last event to emit.
// Emit the 'response' event if stream has no 'metadata' event.
// This avoids the stream swallowing the other events, such as 'end'.
stream.on('status', () => {
if (!this._responseHasSent) {
stream.emit('response', {
code: 200,
details: '',
message: 'OK',
});
}
});
// We also want to supply the status data as 'response' event to support
// the behavior of google-cloud-node expects.
// see:
// https://github.com/GoogleCloudPlatform/google-cloud-node/pull/1775#issuecomment-259141029
// https://github.com/GoogleCloudPlatform/google-cloud-node/blob/116436fa789d8b0f7fc5100b19b424e3ec63e6bf/packages/common/src/grpc-service.js#L355
stream.on('metadata', metadata => {
// Create a response object with succeeds.
// TODO: unify this logic with the decoration of gRPC response when it's
// added. see: https://github.com/googleapis/gax-nodejs/issues/65
stream.emit('response', {
code: 200,
details: '',
message: 'OK',
metadata,
});
this._responseHasSent = true;
});
stream.on('error', error => {
const timeout = retry.backoffSettings.totalTimeoutMillis;
const maxRetries = retry.backoffSettings.maxRetries;
if ((maxRetries && maxRetries > 0) || (timeout && timeout > 0)) {
const e = googleError_1.GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e);
}
if (shouldRetry) {
if (maxRetries && timeout) {
const newError = new googleError_1.GoogleError('Cannot set both totalTimeoutMillis and maxRetries ' +
'in backoffSettings.');
newError.code = status_1.Status.INVALID_ARGUMENT;
this.emit('error', newError);
this.destroy();
return; //end chunk
}
else {
retryStream = this.retry(stream, retry);
this.stream = retryStream;
return retryStream;
}
}
else {
const e = googleError_1.GoogleError.parseGRPCStatusDetails(error);
e.note =
'Exception occurred in retry method that was ' +
'not classified as transient';
this.destroy(e);
return; // end chunk
}
}
else {
return googleError_1.GoogleError.parseGRPCStatusDetails(error);
}
});
return retryStream;
}
/**
* Resets the target stream as part of the retry process
* @param {CancellableStream} requestStream - the stream to end
*/
resetStreams(requestStream) {
if (requestStream) {
requestStream.cancel && requestStream.cancel();
if (requestStream.destroy) {
requestStream.destroy();
}
else if (requestStream.end) {
// TODO: not used in server streaming, but likely needed
// if we want to add BIDI or client side streaming
requestStream.end();
}
}
}
/**
* Specifies the target stream.
* @param {ApiCall} apiCall - the API function to be called.
* @param {Object} argument - the argument to be passed to the apiCall.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/
setStream(apiCall, argument, retryRequestOptions = {}) {
setStream(apiCall, argument, retryRequestOptions = {}, retry) {
this.apiCall = apiCall;
this.argument = argument;
if (this.type === StreamType.SERVER_STREAMING) {

@@ -126,2 +372,19 @@ if (this.rest) {

}
else if (this.gaxServerStreamingRetries) {
const retryStream = (0, streamingRetryRequest_1.streamingRetryRequest)({
request: () => {
if (this._isCancelCalled) {
if (this.stream) {
this.stream.cancel();
}
return;
}
const stream = apiCall(argument, this._callback);
this.stream = stream;
this.stream = this.forwardEventsWithRetries(stream, retry);
return this.stream;
},
});
this.setReadable(retryStream);
}
else {

@@ -128,0 +391,0 @@ const retryStream = retryRequest(null, {

@@ -32,3 +32,3 @@ "use strict";

init(callback) {
return new streaming_1.StreamProxy(this.descriptor.type, callback, this.descriptor.rest);
return new streaming_1.StreamProxy(this.descriptor.type, callback, this.descriptor.rest, this.descriptor.gaxStreamingRetries);
}

@@ -55,3 +55,3 @@ wrap(func) {

call(apiCall, argument, settings, stream) {
stream.setStream(apiCall, argument, settings.retryRequestOptions);
stream.setStream(apiCall, argument, settings.retryRequestOptions, settings.retry);
}

@@ -58,0 +58,0 @@ fail(stream, err) {

{
"name": "google-gax",
"version": "4.0.6-experimental",
"version": "4.1.0",
"description": "Google API Extensions",

@@ -26,3 +26,3 @@ "main": "build/src/index.js",

"devDependencies": {
"@compodoc/compodoc": "1.1.21",
"@compodoc/compodoc": "1.1.23",
"@babel/plugin-proposal-private-methods": "^7.18.6",

@@ -36,3 +36,3 @@ "@types/mocha": "^9.0.0",

"@types/pumpify": "^1.4.1",
"@types/sinon": "^10.0.0",
"@types/sinon": "^17.0.0",
"@types/uglify-js": "^3.17.0",

@@ -42,3 +42,3 @@ "c8": "^8.0.0",

"execa": "^5.0.0",
"google-proto-files": "4.0.0-experimental",
"google-proto-files": "^4.1.0",
"gts": "^5.0.0",

@@ -55,3 +55,3 @@ "linkinator": "^4.0.0",

"rimraf": "^5.0.1",
"sinon": "^16.0.0",
"sinon": "^17.0.0",
"stream-events": "^1.0.4",

@@ -58,0 +58,0 @@ "ts-loader": "^8.0.0",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc