@google-cloud/pubsub
Advanced tools
Comparing version 3.0.3 to 3.1.0
@@ -16,2 +16,3 @@ /*! | ||
*/ | ||
import { Duration } from './temporal'; | ||
export declare const defaultOptions: { | ||
@@ -21,4 +22,7 @@ subscription: { | ||
maxOutstandingBytes: number; | ||
minAckDeadline: undefined; | ||
maxAckDeadline: Duration; | ||
maxExtensionMinutes: number; | ||
maxStreams: number; | ||
ackDeadline: number; | ||
}; | ||
@@ -25,0 +29,0 @@ publish: { |
@@ -19,2 +19,3 @@ "use strict"; | ||
exports.defaultOptions = void 0; | ||
const temporal_1 = require("./temporal"); | ||
// These options will be used library-wide. They're specified here so that | ||
@@ -30,2 +31,6 @@ // they can be changed easily in the future. | ||
maxOutstandingBytes: 100 * 1024 * 1024, | ||
// The minimum length of time a message's lease will be extended by. | ||
minAckDeadline: undefined, | ||
// The maximum length of time a message's lease will be extended by. | ||
maxAckDeadline: temporal_1.Duration.from({ minutes: 10 }), | ||
// The maximum number of minutes that a message's lease will ever | ||
@@ -37,2 +42,4 @@ // be extended. | ||
maxStreams: 5, | ||
// The starting number of seconds that ack deadlines will be extended. | ||
ackDeadline: 10, | ||
}, | ||
@@ -39,0 +46,0 @@ publish: { |
@@ -97,3 +97,4 @@ /*! | ||
export { CreateTopicCallback, CreateTopicResponse, GetTopicCallback, GetTopicResponse, GetTopicOptions, GetTopicMetadataCallback, GetTopicMetadataResponse, GetTopicSubscriptionsCallback, GetTopicSubscriptionsResponse, SetTopicMetadataCallback, SetTopicMetadataResponse, Topic, TopicMetadata, } from './topic'; | ||
export { Duration, TotalOfUnit, DurationLike } from './temporal'; | ||
import * as protos from '../protos/protos'; | ||
export { protos }; |
@@ -18,3 +18,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.protos = exports.Topic = exports.Subscription = exports.Encodings = exports.SchemaViews = exports.SchemaTypes = exports.Schema = exports.Message = exports.Snapshot = exports.PubSub = exports.PublishError = exports.IAM = exports.v1 = void 0; | ||
exports.protos = exports.Duration = exports.Topic = exports.Subscription = exports.Encodings = exports.SchemaViews = exports.SchemaTypes = exports.Schema = exports.Message = exports.Snapshot = exports.PubSub = exports.PublishError = exports.IAM = exports.v1 = void 0; | ||
/** | ||
@@ -108,2 +108,4 @@ * @namespace google.pubsub.v1 | ||
Object.defineProperty(exports, "Topic", { enumerable: true, get: function () { return topic_1.Topic; } }); | ||
var temporal_1 = require("./temporal"); | ||
Object.defineProperty(exports, "Duration", { enumerable: true, get: function () { return temporal_1.Duration; } }); | ||
if (process.env.DEBUG_GRPC) { | ||
@@ -110,0 +112,0 @@ console.info('gRPC logging set to verbose'); |
@@ -39,3 +39,3 @@ /*! | ||
* @property {number} [maxExtensionMinutes=60] The maximum duration (in minutes) | ||
* to extend the message deadline before redelivering. | ||
* to extend the message deadline before redelivering. | ||
* @property {number} [maxMessages=1000] The desired number of messages to allow | ||
@@ -42,0 +42,0 @@ * in memory before pausing the message stream. Unless allowExcessMessages |
@@ -33,3 +33,3 @@ "use strict"; | ||
* @property {number} [maxExtensionMinutes=60] The maximum duration (in minutes) | ||
* to extend the message deadline before redelivering. | ||
* to extend the message deadline before redelivering. | ||
* @property {number} [maxMessages=1000] The desired number of messages to allow | ||
@@ -36,0 +36,0 @@ * in memory before pausing the message stream. Unless allowExcessMessages |
@@ -26,2 +26,3 @@ /*! | ||
import { SubscriberClient } from './v1'; | ||
import { Duration } from './temporal'; | ||
export declare type PullResponse = google.pubsub.v1.IStreamingPullResponse; | ||
@@ -113,15 +114,14 @@ export declare type SubscriptionProperties = google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties; | ||
} | ||
export interface SubscriberOptions { | ||
ackDeadline?: number; | ||
batching?: BatchOptions; | ||
flowControl?: FlowControlOptions; | ||
useLegacyFlowControl?: boolean; | ||
streamingOptions?: MessageStreamOptions; | ||
enableOpenTelemetryTracing?: boolean; | ||
} | ||
/** | ||
* @typedef {object} SubscriberOptions | ||
* @property {number} [ackDeadline=10] Acknowledge deadline in seconds. If left | ||
* unset the initial value will be 10 seconds, but it will evolve into the | ||
* 99th percentile time it takes to acknowledge a message. | ||
* unset, the initial value will be 10 seconds, but it will evolve into the | ||
* 99th percentile time it takes to acknowledge a message, subject to the | ||
* limitations of minAckDeadline and maxAckDeadline. If ackDeadline is set | ||
* by the user, then the min/max values will be set to match it. New code | ||
* should prefer setting minAckDeadline and maxAckDeadline directly. | ||
* @property {Duration} [minAckDeadline] The minimum time that ackDeadline should | ||
* ever have, while it's under library control. | ||
* @property {Duration} [maxAckDeadline] The maximum time that ackDeadline should | ||
* ever have, while it's under library control. | ||
* @property {BatchOptions} [batching] Request batching options. | ||
@@ -134,2 +134,13 @@ * @property {FlowControlOptions} [flowControl] Flow control options. | ||
*/ | ||
export interface SubscriberOptions { | ||
/** @deprecated Use minAckDeadline and maxAckDeadline. */ | ||
ackDeadline?: number; | ||
minAckDeadline?: Duration; | ||
maxAckDeadline?: Duration; | ||
batching?: BatchOptions; | ||
flowControl?: FlowControlOptions; | ||
useLegacyFlowControl?: boolean; | ||
streamingOptions?: MessageStreamOptions; | ||
enableOpenTelemetryTracing?: boolean; | ||
} | ||
/** | ||
@@ -153,3 +164,2 @@ * Subscriber class is used to manage all message related functionality. | ||
private _inventory; | ||
private _isUserSetDeadline; | ||
private _useOpentelemetry; | ||
@@ -166,4 +176,25 @@ private _latencies; | ||
/** | ||
* Sets our subscription properties from the first incoming message. | ||
* Update our ack extension time that will be used by the lease manager | ||
* for sending modAcks. | ||
* | ||
* Should not be called from outside this class, except for unit tests. | ||
* | ||
* @param {number} [ackTimeSeconds] The number of seconds that the last | ||
* ack took after the message was received. If this is undefined, then | ||
* we won't update the histogram, but we will still recalculate the | ||
* ackDeadline based on the situation. | ||
* | ||
* @private | ||
*/ | ||
updateAckDeadline(ackTimeSeconds?: number): void; | ||
private getMinMaxDeadlines; | ||
/** | ||
* Returns true if an exactly once subscription has been detected. | ||
* | ||
* @private | ||
*/ | ||
get isExactlyOnce(): boolean; | ||
/** | ||
* Sets our subscription properties from incoming messages. | ||
* | ||
* @param {SubscriptionProperties} subscriptionProperties The new properties. | ||
@@ -170,0 +201,0 @@ * @private |
@@ -32,2 +32,3 @@ "use strict"; | ||
const util_1 = require("./util"); | ||
const temporal_1 = require("./temporal"); | ||
/** | ||
@@ -191,15 +192,4 @@ * Date object with nanosecond precision. Supports all standard Date arguments | ||
exports.Message = Message; | ||
const minAckDeadlineForExactlyOnce = temporal_1.Duration.from({ seconds: 60 }); | ||
/** | ||
* @typedef {object} SubscriberOptions | ||
* @property {number} [ackDeadline=10] Acknowledge deadline in seconds. If left | ||
* unset the initial value will be 10 seconds, but it will evolve into the | ||
* 99th percentile time it takes to acknowledge a message. | ||
* @property {BatchOptions} [batching] Request batching options. | ||
* @property {FlowControlOptions} [flowControl] Flow control options. | ||
* @property {boolean} [useLegacyFlowControl] Disables enforcing flow control | ||
* settings at the Cloud PubSub server and uses the less accurate method | ||
* of only enforcing flow control at the client side. | ||
* @property {MessageStreamOptions} [streamingOptions] Streaming options. | ||
*/ | ||
/** | ||
* Subscriber class is used to manage all message related functionality. | ||
@@ -216,3 +206,3 @@ * | ||
super(); | ||
this.ackDeadline = 10; | ||
this.ackDeadline = default_options_1.defaultOptions.subscription.ackDeadline; | ||
this.maxMessages = default_options_1.defaultOptions.subscription.maxOutstandingMessages; | ||
@@ -222,3 +212,2 @@ this.maxBytes = default_options_1.defaultOptions.subscription.maxOutstandingBytes; | ||
this.isOpen = false; | ||
this._isUserSetDeadline = false; | ||
this._useOpentelemetry = false; | ||
@@ -232,4 +221,61 @@ this._histogram = new histogram_1.Histogram({ min: 10, max: 600 }); | ||
/** | ||
* Sets our subscription properties from the first incoming message. | ||
* Update our ack extension time that will be used by the lease manager | ||
* for sending modAcks. | ||
* | ||
* Should not be called from outside this class, except for unit tests. | ||
* | ||
* @param {number} [ackTimeSeconds] The number of seconds that the last | ||
* ack took after the message was received. If this is undefined, then | ||
* we won't update the histogram, but we will still recalculate the | ||
* ackDeadline based on the situation. | ||
* | ||
* @private | ||
*/ | ||
updateAckDeadline(ackTimeSeconds) { | ||
// Start with the value we already have. | ||
let ackDeadline = this.ackDeadline; | ||
// If we got an ack time reading, update the histogram (and ackDeadline). | ||
if (ackTimeSeconds) { | ||
this._histogram.add(ackTimeSeconds); | ||
ackDeadline = this._histogram.percentile(99); | ||
} | ||
// Grab our current min/max deadline values, based on whether exactly-once | ||
// is enabled, and the defaults. | ||
const [minDeadline, maxDeadline] = this.getMinMaxDeadlines(); | ||
if (minDeadline) { | ||
ackDeadline = Math.max(ackDeadline, minDeadline.totalOf('second')); | ||
} | ||
if (maxDeadline) { | ||
ackDeadline = Math.min(ackDeadline, maxDeadline.totalOf('second')); | ||
} | ||
// Set the bounded result back. | ||
this.ackDeadline = ackDeadline; | ||
} | ||
getMinMaxDeadlines() { | ||
var _a, _b; | ||
// If this is an exactly-once subscription, and the user didn't set their | ||
// own minimum ack periods, set it to the default for exactly-once. | ||
const defaultMinDeadline = this.isExactlyOnce | ||
? minAckDeadlineForExactlyOnce | ||
: default_options_1.defaultOptions.subscription.minAckDeadline; | ||
const defaultMaxDeadline = default_options_1.defaultOptions.subscription.maxAckDeadline; | ||
// Pull in any user-set min/max. | ||
const minDeadline = (_a = this._options.minAckDeadline) !== null && _a !== void 0 ? _a : defaultMinDeadline; | ||
const maxDeadline = (_b = this._options.maxAckDeadline) !== null && _b !== void 0 ? _b : defaultMaxDeadline; | ||
return [minDeadline, maxDeadline]; | ||
} | ||
/** | ||
* Returns true if an exactly once subscription has been detected. | ||
* | ||
* @private | ||
*/ | ||
get isExactlyOnce() { | ||
if (!this.subscriptionProperties) { | ||
return false; | ||
} | ||
return !!this.subscriptionProperties.exactlyOnceDeliveryEnabled; | ||
} | ||
/** | ||
* Sets our subscription properties from incoming messages. | ||
* | ||
* @param {SubscriptionProperties} subscriptionProperties The new properties. | ||
@@ -239,5 +285,7 @@ * @private | ||
setSubscriptionProperties(subscriptionProperties) { | ||
const previouslyEnabled = this.isExactlyOnce; | ||
this.subscriptionProperties = subscriptionProperties; | ||
// If this is an exactly-once subscription, warn the user that they may have difficulty. | ||
// If this is an exactly-once subscription... | ||
if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) { | ||
// Warn the user that they may have difficulty. | ||
this._errorLog.doMaybe(() => console.error('WARNING: Exactly-once subscriptions are not yet supported ' + | ||
@@ -247,2 +295,6 @@ 'by the Node client library. This feature will be added ' + | ||
} | ||
// Update ackDeadline in case the flag switched. | ||
if (previouslyEnabled !== this.isExactlyOnce) { | ||
this.updateAckDeadline(); | ||
} | ||
} | ||
@@ -284,7 +336,4 @@ /** | ||
async ack(message) { | ||
if (!this._isUserSetDeadline) { | ||
const ackTimeSeconds = (Date.now() - message.received) / 1000; | ||
this._histogram.add(ackTimeSeconds); | ||
this.ackDeadline = this._histogram.percentile(99); | ||
} | ||
const ackTimeSeconds = (Date.now() - message.received) / 1000; | ||
this.updateAckDeadline(ackTimeSeconds); | ||
this._acks.add(message); | ||
@@ -380,5 +429,9 @@ await this._acks.onFlush(); | ||
this._useOpentelemetry = options.enableOpenTelemetryTracing || false; | ||
if (options.ackDeadline) { | ||
this.ackDeadline = options.ackDeadline; | ||
this._isUserSetDeadline = true; | ||
// The user-set ackDeadline value basically pegs the extension time. | ||
// We'll emulate it by overwriting min/max. | ||
const passedAckDeadline = options.ackDeadline; | ||
if (passedAckDeadline !== undefined) { | ||
this.ackDeadline = passedAckDeadline; | ||
options.minAckDeadline = temporal_1.Duration.from({ seconds: passedAckDeadline }); | ||
options.maxAckDeadline = temporal_1.Duration.from({ seconds: passedAckDeadline }); | ||
} | ||
@@ -385,0 +438,0 @@ this.useLegacyFlowControl = options.useLegacyFlowControl || false; |
{ | ||
"name": "@google-cloud/pubsub", | ||
"description": "Cloud Pub/Sub Client Library for Node.js", | ||
"version": "3.0.3", | ||
"version": "3.1.0", | ||
"license": "Apache-2.0", | ||
@@ -6,0 +6,0 @@ "author": "Google Inc.", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
3091593
97
51872