@google-cloud/pubsub
Advanced tools
Comparing version 3.1.1 to 3.2.0
@@ -95,3 +95,3 @@ /*! | ||
export { Schema, CreateSchemaResponse, ISchema, SchemaType, SchemaTypes, ICreateSchemaRequest, SchemaEncoding, SchemaView, SchemaViews, Encodings, SchemaMessageMetadata, } from './schema'; | ||
export { PushConfig, SubscriptionMetadata, SubscriptionOptions, SubscriptionCloseCallback, CreateSubscriptionOptions, CreateSubscriptionCallback, CreateSubscriptionResponse, GetSubscriptionOptions, GetSubscriptionCallback, GetSubscriptionResponse, GetSubscriptionMetadataCallback, GetSubscriptionMetadataResponse, SetSubscriptionMetadataCallback, SetSubscriptionMetadataResponse, Subscription, } from './subscription'; | ||
export { PushConfig, SubscriptionMetadata, SubscriptionOptions, SubscriptionCloseCallback, CreateSubscriptionOptions, CreateSubscriptionCallback, CreateSubscriptionResponse, GetSubscriptionOptions, GetSubscriptionCallback, GetSubscriptionResponse, GetSubscriptionMetadataCallback, GetSubscriptionMetadataResponse, SetSubscriptionMetadataCallback, SetSubscriptionMetadataResponse, Subscription, AckError, AckResponse, AckResponses, } from './subscription'; | ||
export { CreateTopicCallback, CreateTopicResponse, GetTopicCallback, GetTopicResponse, GetTopicOptions, GetTopicMetadataCallback, GetTopicMetadataResponse, GetTopicSubscriptionsCallback, GetTopicSubscriptionsResponse, SetTopicMetadataCallback, SetTopicMetadataResponse, Topic, TopicMetadata, } from './topic'; | ||
@@ -98,0 +98,0 @@ export { Duration, TotalOfUnit, DurationLike } from './temporal'; |
@@ -18,3 +18,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.protos = exports.Duration = exports.Topic = exports.Subscription = exports.Encodings = exports.SchemaViews = exports.SchemaTypes = exports.Schema = exports.Message = exports.Snapshot = exports.PubSub = exports.PublishError = exports.IAM = exports.v1 = void 0; | ||
exports.protos = exports.Duration = exports.Topic = exports.AckResponses = exports.AckError = exports.Subscription = exports.Encodings = exports.SchemaViews = exports.SchemaTypes = exports.Schema = exports.Message = exports.Snapshot = exports.PubSub = exports.PublishError = exports.IAM = exports.v1 = void 0; | ||
/** | ||
@@ -106,2 +106,4 @@ * @namespace google.pubsub.v1 | ||
Object.defineProperty(exports, "Subscription", { enumerable: true, get: function () { return subscription_1.Subscription; } }); | ||
Object.defineProperty(exports, "AckError", { enumerable: true, get: function () { return subscription_1.AckError; } }); | ||
Object.defineProperty(exports, "AckResponses", { enumerable: true, get: function () { return subscription_1.AckResponses; } }); | ||
var topic_1 = require("./topic"); | ||
@@ -108,0 +110,0 @@ Object.defineProperty(exports, "Topic", { enumerable: true, get: function () { return topic_1.Topic; } }); |
@@ -17,6 +17,19 @@ /*! | ||
/// <reference types="node" /> | ||
import { CallOptions, grpc } from 'google-gax'; | ||
import { CallOptions, GoogleError, grpc } from 'google-gax'; | ||
import defer = require('p-defer'); | ||
import { Message, Subscriber } from './subscriber'; | ||
declare type QueuedMessages = Array<[string, number?]>; | ||
import { ExponentialRetry } from './exponential-retry'; | ||
import { AckResponse, Message, Subscriber } from './subscriber'; | ||
/** | ||
* @private | ||
*/ | ||
export interface QueuedMessage { | ||
ackId: string; | ||
deadline?: number; | ||
responsePromise?: defer.DeferredPromise<void>; | ||
retryCount: number; | ||
} | ||
/** | ||
* @private | ||
*/ | ||
export declare type QueuedMessages = Array<QueuedMessage>; | ||
export interface BatchOptions { | ||
@@ -30,13 +43,15 @@ callOptions?: CallOptions; | ||
* | ||
* Now that we have exactly-once delivery subscriptions, we'll only | ||
* throw one of these if there was an unknown error. | ||
* | ||
* @class | ||
* | ||
* @param {string} message The error message. | ||
* @param {ServiceError} err The grpc service error. | ||
* @param {GoogleError} err The grpc error. | ||
*/ | ||
export declare class BatchError extends Error implements grpc.ServiceError { | ||
export declare class BatchError extends Error { | ||
ackIds: string[]; | ||
code: grpc.status; | ||
details: string; | ||
metadata: grpc.Metadata; | ||
constructor(err: grpc.ServiceError, ackIds: string[], rpc: string); | ||
constructor(err: GoogleError, ackIds: string[], rpc: string); | ||
} | ||
@@ -65,2 +80,3 @@ /** | ||
numInFlightRequests: number; | ||
numInRetryRequests: number; | ||
protected _onFlush?: defer.DeferredPromise<void>; | ||
@@ -72,5 +88,18 @@ protected _onDrain?: defer.DeferredPromise<void>; | ||
protected _timer?: NodeJS.Timer; | ||
protected abstract _sendBatch(batch: QueuedMessages): Promise<void>; | ||
protected _retrier: ExponentialRetry<QueuedMessage>; | ||
protected _closed: boolean; | ||
protected abstract _sendBatch(batch: QueuedMessages): Promise<QueuedMessages>; | ||
constructor(sub: Subscriber, options?: BatchOptions); | ||
/** | ||
* Shuts down this message queue gracefully. Any acks/modAcks pending in | ||
* the queue or waiting for retry will be removed. If exactly-once delivery | ||
* is enabled on the subscription, we'll send permanent failures to | ||
* anyone waiting on completions; otherwise we'll send successes. | ||
* | ||
* If a flush is desired first, do it before calling close(). | ||
* | ||
* @private | ||
*/ | ||
close(): void; | ||
/** | ||
* Gets the default buffer time in ms. | ||
@@ -89,4 +118,17 @@ * | ||
*/ | ||
add({ ackId }: Message, deadline?: number): void; | ||
add({ ackId }: Message, deadline?: number): Promise<void>; | ||
/** | ||
* Retry handler for acks/modacks that have transient failures. Unless | ||
* it's passed the final deadline, we will just re-queue it for sending. | ||
* | ||
* @private | ||
*/ | ||
private handleRetry; | ||
/** | ||
* This hook lets a subclass tell the retry handler to go ahead and fail early. | ||
* | ||
* @private | ||
*/ | ||
protected shouldFailEarly(message: QueuedMessage): boolean; | ||
/** | ||
* Sends a batch of messages. | ||
@@ -114,2 +156,33 @@ * @private | ||
setOptions(options: BatchOptions): void; | ||
/** | ||
* Succeed a whole batch of Acks/Modacks for an OK RPC response. | ||
* | ||
* @private | ||
*/ | ||
handleAckSuccesses(batch: QueuedMessages): void; | ||
/** | ||
* If we get an RPC failure of any kind, this will take care of deciding | ||
* what to do for each related ack/modAck. Successful ones will have their | ||
* Promises resolved, permanent errors will have their Promises rejected, | ||
* and transients will be returned for retry. | ||
* | ||
* Note that this is only used for subscriptions with exactly-once | ||
* delivery enabled, so _sendBatch() in the classes below take care of | ||
* resolving errors to success; they don't make it here. | ||
* | ||
* @private | ||
*/ | ||
handleAckFailures(operation: string, batch: QueuedMessages, rpcError: GoogleError): { | ||
toError: Map<AckResponse, QueuedMessages>; | ||
toRetry: QueuedMessages; | ||
}; | ||
/** | ||
* Since we handle our own retries for ack/modAck calls when exactly-once | ||
* delivery is enabled on a subscription, we conditionally need to disable | ||
* the gax retries. This returns an appropriate CallOptions for the | ||
* subclasses to pass down. | ||
* | ||
* @private | ||
*/ | ||
protected getCallOptions(): CallOptions | undefined; | ||
} | ||
@@ -131,3 +204,3 @@ /** | ||
*/ | ||
protected _sendBatch(batch: QueuedMessages): Promise<void>; | ||
protected _sendBatch(batch: QueuedMessages): Promise<QueuedMessages>; | ||
} | ||
@@ -150,4 +223,4 @@ /** | ||
*/ | ||
protected _sendBatch(batch: QueuedMessages): Promise<void>; | ||
protected _sendBatch(batch: QueuedMessages): Promise<QueuedMessages>; | ||
protected shouldFailEarly(message: QueuedMessage): boolean; | ||
} | ||
export {}; |
@@ -19,10 +19,19 @@ "use strict"; | ||
exports.ModAckQueue = exports.AckQueue = exports.MessageQueue = exports.BatchError = void 0; | ||
const google_gax_1 = require("google-gax"); | ||
const defer = require("p-defer"); | ||
const ack_metadata_1 = require("./ack-metadata"); | ||
const exponential_retry_1 = require("./exponential-retry"); | ||
const subscriber_1 = require("./subscriber"); | ||
const temporal_1 = require("./temporal"); | ||
const util_1 = require("./util"); | ||
/** | ||
* Error class used to signal a batch failure. | ||
* | ||
* Now that we have exactly-once delivery subscriptions, we'll only | ||
* throw one of these if there was an unknown error. | ||
* | ||
* @class | ||
* | ||
* @param {string} message The error message. | ||
* @param {ServiceError} err The grpc service error. | ||
* @param {GoogleError} err The grpc error. | ||
*/ | ||
@@ -34,4 +43,3 @@ class BatchError extends Error { | ||
this.code = err.code; | ||
this.details = err.details; | ||
this.metadata = err.metadata; | ||
this.details = err.message; | ||
} | ||
@@ -61,9 +69,40 @@ } | ||
constructor(sub, options = {}) { | ||
this._closed = false; | ||
this.numPendingRequests = 0; | ||
this.numInFlightRequests = 0; | ||
this.numInRetryRequests = 0; | ||
this._requests = []; | ||
this._subscriber = sub; | ||
this._retrier = new exponential_retry_1.ExponentialRetry(temporal_1.Duration.from({ seconds: 1 }), temporal_1.Duration.from({ seconds: 64 })); | ||
this.setOptions(options); | ||
} | ||
/** | ||
* Shuts down this message queue gracefully. Any acks/modAcks pending in | ||
* the queue or waiting for retry will be removed. If exactly-once delivery | ||
* is enabled on the subscription, we'll send permanent failures to | ||
* anyone waiting on completions; otherwise we'll send successes. | ||
* | ||
* If a flush is desired first, do it before calling close(). | ||
* | ||
* @private | ||
*/ | ||
close() { | ||
let requests = this._requests; | ||
this._requests = []; | ||
this.numInFlightRequests = this.numPendingRequests = 0; | ||
requests = requests.concat(this._retrier.close()); | ||
const isExactlyOnceDelivery = this._subscriber.isExactlyOnceDelivery; | ||
requests.forEach(r => { | ||
if (r.responsePromise) { | ||
if (isExactlyOnceDelivery) { | ||
r.responsePromise.reject(new subscriber_1.AckError(subscriber_1.AckResponses.Invalid, 'Subscriber closed')); | ||
} | ||
else { | ||
r.responsePromise.resolve(); | ||
} | ||
} | ||
}); | ||
this._closed = true; | ||
} | ||
/** | ||
* Gets the default buffer time in ms. | ||
@@ -85,6 +124,20 @@ * | ||
add({ ackId }, deadline) { | ||
if (this._closed) { | ||
if (this._subscriber.isExactlyOnceDelivery) { | ||
throw new subscriber_1.AckError(subscriber_1.AckResponses.Invalid, 'Subscriber closed'); | ||
} | ||
else { | ||
return Promise.resolve(); | ||
} | ||
} | ||
const { maxMessages, maxMilliseconds } = this._options; | ||
this._requests.push([ackId, deadline]); | ||
this.numPendingRequests += 1; | ||
this.numInFlightRequests += 1; | ||
const responsePromise = defer(); | ||
this._requests.push({ | ||
ackId, | ||
deadline, | ||
responsePromise, | ||
retryCount: 0, | ||
}); | ||
this.numPendingRequests++; | ||
this.numInFlightRequests++; | ||
if (this._requests.length >= maxMessages) { | ||
@@ -96,4 +149,37 @@ this.flush(); | ||
} | ||
return responsePromise.promise; | ||
} | ||
/** | ||
* Retry handler for acks/modacks that have transient failures. Unless | ||
* it's passed the final deadline, we will just re-queue it for sending. | ||
* | ||
* @private | ||
*/ | ||
handleRetry(message, totalTime) { | ||
var _a; | ||
// Has it been too long? | ||
if (totalTime.totalOf('minute') >= 10 || this.shouldFailEarly(message)) { | ||
(_a = message.responsePromise) === null || _a === void 0 ? void 0 : _a.reject(new subscriber_1.AckError(subscriber_1.AckResponses.Invalid, 'Retried for too long')); | ||
return; | ||
} | ||
// Just throw it in for another round of processing on the next batch. | ||
this._requests.push(message); | ||
this.numPendingRequests++; | ||
this.numInFlightRequests++; | ||
this.numInRetryRequests--; | ||
// Make sure we actually do have another batch scheduled. | ||
if (!this._timer) { | ||
this._timer = setTimeout(() => this.flush(), this._options.maxMilliseconds); | ||
} | ||
} | ||
/** | ||
* This hook lets a subclass tell the retry handler to go ahead and fail early. | ||
* | ||
* @private | ||
*/ | ||
shouldFailEarly(message) { | ||
message; | ||
return false; | ||
} | ||
/** | ||
* Sends a batch of messages. | ||
@@ -114,3 +200,9 @@ * @private | ||
try { | ||
await this._sendBatch(batch); | ||
const toRetry = await this._sendBatch(batch); | ||
// We'll get back anything that needs a retry for transient errors. | ||
for (const m of toRetry) { | ||
this.numInRetryRequests++; | ||
m.retryCount++; | ||
this._retrier.retryLater(m, this.handleRetry.bind(this)); | ||
} | ||
} | ||
@@ -127,3 +219,5 @@ catch (e) { | ||
} | ||
if (this.numInFlightRequests <= 0 && this._onDrain) { | ||
if (this.numInFlightRequests <= 0 && | ||
this.numInRetryRequests <= 0 && | ||
this._onDrain) { | ||
this._onDrain.resolve(); | ||
@@ -164,2 +258,111 @@ delete this._onDrain; | ||
} | ||
/** | ||
* Succeed a whole batch of Acks/Modacks for an OK RPC response. | ||
* | ||
* @private | ||
*/ | ||
handleAckSuccesses(batch) { | ||
// Everyone gets a resolve! | ||
batch.forEach(({ responsePromise }) => { | ||
responsePromise === null || responsePromise === void 0 ? void 0 : responsePromise.resolve(); | ||
}); | ||
} | ||
/** | ||
* If we get an RPC failure of any kind, this will take care of deciding | ||
* what to do for each related ack/modAck. Successful ones will have their | ||
* Promises resolved, permanent errors will have their Promises rejected, | ||
* and transients will be returned for retry. | ||
* | ||
* Note that this is only used for subscriptions with exactly-once | ||
* delivery enabled, so _sendBatch() in the classes below take care of | ||
* resolving errors to success; they don't make it here. | ||
* | ||
* @private | ||
*/ | ||
handleAckFailures(operation, batch, rpcError) { | ||
const toSucceed = []; | ||
const toRetry = []; | ||
const toError = new Map([ | ||
[subscriber_1.AckResponses.PermissionDenied, []], | ||
[subscriber_1.AckResponses.FailedPrecondition, []], | ||
[subscriber_1.AckResponses.Other, []], | ||
]); | ||
// Parse any error codes, both for the RPC call and the ErrorInfo. | ||
const error = rpcError.code | ||
? (0, ack_metadata_1.processAckRpcError)(rpcError.code) | ||
: undefined; | ||
const codes = (0, ack_metadata_1.processAckErrorInfo)(rpcError); | ||
for (const m of batch) { | ||
if (codes.has(m.ackId)) { | ||
// This ack has an ErrorInfo entry, so use that to route it. | ||
const code = codes.get(m.ackId); | ||
if (code.transient) { | ||
// Transient errors get retried. | ||
toRetry.push(m); | ||
} | ||
else { | ||
// It's a permanent error. | ||
(0, util_1.addToBucket)(toError, code.response, m); | ||
} | ||
} | ||
else if (error !== undefined) { | ||
// This ack doesn't have an ErrorInfo entry, but we do have an RPC | ||
// error, so use that to route it. | ||
if (error.transient) { | ||
toRetry.push(m); | ||
} | ||
else { | ||
(0, util_1.addToBucket)(toError, error.response, m); | ||
} | ||
} | ||
else { | ||
// Looks like this one worked out. | ||
toSucceed.push(m); | ||
} | ||
} | ||
// To remain consistent with previous behaviour, we will push a debug | ||
// stream message if an unknown error happens during ack. | ||
const others = toError.get(subscriber_1.AckResponses.Other); | ||
if (others === null || others === void 0 ? void 0 : others.length) { | ||
const otherIds = others.map(e => e.ackId); | ||
this._subscriber.emit('debug', new BatchError(rpcError, otherIds, operation)); | ||
} | ||
// Take care of following up on all the Promises. | ||
toSucceed.forEach(m => { | ||
var _a; | ||
(_a = m.responsePromise) === null || _a === void 0 ? void 0 : _a.resolve(); | ||
}); | ||
for (const e of toError.entries()) { | ||
e[1].forEach(m => { | ||
var _a; | ||
const exc = new subscriber_1.AckError(e[0], rpcError.message); | ||
(_a = m.responsePromise) === null || _a === void 0 ? void 0 : _a.reject(exc); | ||
}); | ||
} | ||
return { | ||
toError, | ||
toRetry, | ||
}; | ||
} | ||
/** | ||
* Since we handle our own retries for ack/modAck calls when exactly-once | ||
* delivery is enabled on a subscription, we conditionally need to disable | ||
* the gax retries. This returns an appropriate CallOptions for the | ||
* subclasses to pass down. | ||
* | ||
* @private | ||
*/ | ||
getCallOptions() { | ||
let callOptions = this._options.callOptions; | ||
if (this._subscriber.isExactlyOnceDelivery) { | ||
// If exactly-once-delivery is enabled, tell gax not to do retries for us. | ||
callOptions = Object.assign({}, callOptions !== null && callOptions !== void 0 ? callOptions : {}); | ||
callOptions.retry = new google_gax_1.RetryOptions([], { | ||
initialRetryDelayMillis: 0, | ||
retryDelayMultiplier: 0, | ||
maxRetryDelayMillis: 0, | ||
}); | ||
} | ||
return callOptions; | ||
} | ||
} | ||
@@ -184,9 +387,38 @@ exports.MessageQueue = MessageQueue; | ||
const client = await this._subscriber.getClient(); | ||
const ackIds = batch.map(([ackId]) => ackId); | ||
const ackIds = batch.map(({ ackId }) => ackId); | ||
const reqOpts = { subscription: this._subscriber.name, ackIds }; | ||
try { | ||
await client.acknowledge(reqOpts, this._options.callOptions); | ||
await client.acknowledge(reqOpts, this.getCallOptions()); | ||
// It's okay if these pass through since they're successful anyway. | ||
this.handleAckSuccesses(batch); | ||
return []; | ||
} | ||
catch (e) { | ||
throw new BatchError(e, ackIds, 'acknowledge'); | ||
// If exactly-once delivery isn't enabled, don't do error processing. We'll | ||
// emulate previous behaviour by resolving all pending Promises with | ||
// a success status, and then throwing a BatchError for debug logging. | ||
if (!this._subscriber.isExactlyOnceDelivery) { | ||
batch.forEach(m => { | ||
var _a; | ||
(_a = m.responsePromise) === null || _a === void 0 ? void 0 : _a.resolve(); | ||
}); | ||
throw new BatchError(e, ackIds, 'ack'); | ||
} | ||
else { | ||
const grpcError = e; | ||
try { | ||
const results = this.handleAckFailures('ack', batch, grpcError); | ||
return results.toRetry; | ||
} | ||
catch (e) { | ||
// This should only ever happen if there's a code failure. | ||
this._subscriber.emit('debug', e); | ||
const exc = new subscriber_1.AckError(subscriber_1.AckResponses.Other, 'Code error'); | ||
batch.forEach(m => { | ||
var _a; | ||
(_a = m.responsePromise) === null || _a === void 0 ? void 0 : _a.reject(exc); | ||
}); | ||
return []; | ||
} | ||
} | ||
} | ||
@@ -215,24 +447,52 @@ } | ||
const subscription = this._subscriber.name; | ||
const modAckTable = batch.reduce((table, [ackId, deadline]) => { | ||
if (!table[deadline]) { | ||
table[deadline] = []; | ||
const modAckTable = batch.reduce((table, message) => { | ||
if (!table[message.deadline]) { | ||
table[message.deadline] = []; | ||
} | ||
table[deadline].push(ackId); | ||
table[message.deadline].push(message); | ||
return table; | ||
}, {}); | ||
const callOptions = this.getCallOptions(); | ||
const modAckRequests = Object.keys(modAckTable).map(async (deadline) => { | ||
const ackIds = modAckTable[deadline]; | ||
const messages = modAckTable[deadline]; | ||
const ackIds = messages.map(m => m.ackId); | ||
const ackDeadlineSeconds = Number(deadline); | ||
const reqOpts = { subscription, ackIds, ackDeadlineSeconds }; | ||
try { | ||
await client.modifyAckDeadline(reqOpts, this._options.callOptions); | ||
await client.modifyAckDeadline(reqOpts, callOptions); | ||
// It's okay if these pass through since they're successful anyway. | ||
this.handleAckSuccesses(messages); | ||
return []; | ||
} | ||
catch (e) { | ||
throw new BatchError(e, ackIds, 'modifyAckDeadline'); | ||
// If exactly-once delivery isn't enabled, don't do error processing. We'll | ||
// emulate previous behaviour by resolving all pending Promises with | ||
// a success status, and then throwing a BatchError for debug logging. | ||
if (!this._subscriber.isExactlyOnceDelivery) { | ||
batch.forEach(m => { | ||
var _a; | ||
(_a = m.responsePromise) === null || _a === void 0 ? void 0 : _a.resolve(); | ||
}); | ||
throw new BatchError(e, ackIds, 'modAck'); | ||
} | ||
else { | ||
const grpcError = e; | ||
const newBatch = this.handleAckFailures('modAck', messages, grpcError); | ||
return newBatch.toRetry; | ||
} | ||
} | ||
}); | ||
await Promise.all(modAckRequests); | ||
// This catches the sub-failures and bubbles up anything we need to bubble. | ||
const allNewBatches = await Promise.all(modAckRequests); | ||
return allNewBatches.reduce((p, c) => [ | ||
...(p !== null && p !== void 0 ? p : []), | ||
...c, | ||
]); | ||
} | ||
// For modacks only, we'll stop retrying after 3 tries. | ||
shouldFailEarly(message) { | ||
return message.retryCount >= 3; | ||
} | ||
} | ||
exports.ModAckQueue = ModAckQueue; | ||
//# sourceMappingURL=message-queues.js.map |
@@ -20,2 +20,3 @@ /*! | ||
import { Subscriber } from './subscriber'; | ||
import { Duration } from './temporal'; | ||
/** | ||
@@ -79,2 +80,8 @@ * Error wrapper for gRPC status objects. | ||
/** | ||
* Updates the stream ack deadline with the server. | ||
* | ||
* @param {Duration} deadline The new deadline value to set. | ||
*/ | ||
setStreamAckDeadline(deadline: Duration): void; | ||
/** | ||
* Destroys the stream and any underlying streams. | ||
@@ -81,0 +88,0 @@ * |
@@ -106,2 +106,16 @@ "use strict"; | ||
/** | ||
* Updates the stream ack deadline with the server. | ||
* | ||
* @param {Duration} deadline The new deadline value to set. | ||
*/ | ||
setStreamAckDeadline(deadline) { | ||
const request = { | ||
streamAckDeadlineSeconds: deadline.totalOf('second'), | ||
}; | ||
for (const stream of this._streams.keys()) { | ||
// We don't need a callback on this one, it's advisory. | ||
stream.write(request); | ||
} | ||
} | ||
/** | ||
* Destroys the stream and any underlying streams. | ||
@@ -108,0 +122,0 @@ * |
@@ -70,6 +70,15 @@ "use strict"; | ||
const definedCallback = callback ? callback : () => { }; | ||
const publishes = [(0, promisify_1.promisify)(this.queue.publish).bind(this.queue)()]; | ||
Array.from(this.orderedQueues.values()).forEach(q => publishes.push((0, promisify_1.promisify)(q.publish).bind(q)())); | ||
const allPublishes = Promise.all(publishes); | ||
const toDrain = [this.queue, ...Array.from(this.orderedQueues.values())]; | ||
const allDrains = Promise.all(toDrain.map(q => new Promise(resolve => { | ||
const flushResolver = () => { | ||
resolve(); | ||
// flush() maybe called more than once, so remove these | ||
// event listeners after we've completed flush(). | ||
q.removeListener('drain', flushResolver); | ||
}; | ||
return q.on('drain', flushResolver); | ||
}))); | ||
const allPublishes = Promise.all(toDrain.map(q => (0, promisify_1.promisify)(q.publish).bind(q)())); | ||
allPublishes | ||
.then(() => allDrains) | ||
.then(() => { | ||
@@ -76,0 +85,0 @@ definedCallback(null); |
@@ -93,2 +93,4 @@ /*! | ||
* Cancels any pending publishes and calls _publish immediately. | ||
* | ||
* @emits Queue#drain when all messages are sent. | ||
*/ | ||
@@ -95,0 +97,0 @@ publish(callback?: PublishDone): void; |
@@ -120,4 +120,7 @@ "use strict"; | ||
* Cancels any pending publishes and calls _publish immediately. | ||
* | ||
* @emits Queue#drain when all messages are sent. | ||
*/ | ||
publish(callback) { | ||
const definedCallback = callback || (() => { }); | ||
const { messages, callbacks } = this.batch; | ||
@@ -129,3 +132,15 @@ this.batch = new message_batch_1.MessageBatch(this.batchOptions); | ||
} | ||
this._publish(messages, callbacks, callback); | ||
this._publish(messages, callbacks, (err) => { | ||
if (err) { | ||
definedCallback(err); | ||
} | ||
else if (this.batch.messages.length) { | ||
// Make another go-around, we're trying to drain the queues fully. | ||
this.publish(callback); | ||
} | ||
else { | ||
this.emit('drain'); | ||
definedCallback(null); | ||
} | ||
}); | ||
} | ||
@@ -132,0 +147,0 @@ } |
@@ -29,3 +29,21 @@ /*! | ||
export declare type SubscriptionProperties = google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties; | ||
declare type ValueOf<T> = T[keyof T]; | ||
export declare const AckResponses: { | ||
PermissionDenied: "PERMISSION_DENIED"; | ||
FailedPrecondition: "FAILED_PRECONDITION"; | ||
Success: "SUCCESS"; | ||
Invalid: "INVALID"; | ||
Other: "OTHER"; | ||
}; | ||
export declare type AckResponse = ValueOf<typeof AckResponses>; | ||
/** | ||
* Thrown when an error is detected in an ack/nack/modack call, when | ||
* exactly-once delivery is enabled on the subscription. This will | ||
* only be thrown for actual errors that can't be retried. | ||
*/ | ||
export declare class AckError extends Error { | ||
errorCode: AckResponse; | ||
constructor(errorCode: AckResponse, message?: string); | ||
} | ||
/** | ||
* Date object with nanosecond precision. Supports all standard Date arguments | ||
@@ -96,2 +114,14 @@ * in addition to several custom types. | ||
/** | ||
* Acknowledges the message, expecting a response (for exactly-once delivery subscriptions). | ||
* If exactly-once delivery is not enabled, this will immediately resolve successfully. | ||
* | ||
* @example | ||
* ``` | ||
* subscription.on('message', async (message) => { | ||
* const response = await message.ackWithResponse(); | ||
* }); | ||
* ``` | ||
*/ | ||
ackWithResponse(): Promise<AckResponse>; | ||
/** | ||
* Modifies the ack deadline. | ||
@@ -104,2 +134,10 @@ * | ||
/** | ||
* Modifies the ack deadline, expecting a response (for exactly-once delivery subscriptions). | ||
* If exactly-once delivery is not enabled, this will immediately resolve successfully. | ||
* | ||
* @param {number} deadline The number of seconds to extend the deadline. | ||
* @private | ||
*/ | ||
modAckWithResponse(deadline: number): Promise<AckResponse>; | ||
/** | ||
* Removes the message from our inventory and schedules it to be redelivered. | ||
@@ -115,2 +153,15 @@ * | ||
nack(): void; | ||
/** | ||
* Removes the message from our inventory and schedules it to be redelivered, | ||
* with the modAck response being returned (for exactly-once delivery subscriptions). | ||
* If exactly-once delivery is not enabled, this will immediately resolve successfully. | ||
* | ||
* @example | ||
* ``` | ||
* subscription.on('message', async (message) => { | ||
* const response = await message.nackWithResponse(); | ||
* }); | ||
* ``` | ||
*/ | ||
nackWithResponse(): Promise<AckResponse>; | ||
} | ||
@@ -172,3 +223,2 @@ /** | ||
private _subscription; | ||
private _errorLog; | ||
subscriptionProperties?: SubscriptionProperties; | ||
@@ -192,7 +242,7 @@ constructor(subscription: Subscription, options?: {}); | ||
/** | ||
* Returns true if an exactly once subscription has been detected. | ||
* Returns true if an exactly-once delivery subscription has been detected. | ||
* | ||
* @private | ||
*/ | ||
get isExactlyOnce(): boolean; | ||
get isExactlyOnceDelivery(): boolean; | ||
/** | ||
@@ -223,3 +273,3 @@ * Sets our subscription properties from incoming messages. | ||
* @param {Message} message The message to acknowledge. | ||
* @returns {Promise} | ||
* @returns {Promise<void>} | ||
* @private | ||
@@ -229,2 +279,11 @@ */ | ||
/** | ||
* Acknowledges the supplied message, expecting a response (for exactly | ||
* once subscriptions). | ||
* | ||
* @param {Message} message The message to acknowledge. | ||
* @returns {Promise<AckResponse>} | ||
* @private | ||
*/ | ||
ackWithResponse(message: Message): Promise<AckResponse>; | ||
/** | ||
* Closes the subscriber. The returned promise will resolve once any pending | ||
@@ -249,3 +308,3 @@ * acks/modAcks are finished. | ||
* @param {number} deadline The deadline. | ||
* @returns {Promise} | ||
* @returns {Promise<void>} | ||
* @private | ||
@@ -255,2 +314,12 @@ */ | ||
/** | ||
* Modifies the acknowledge deadline for the provided message, expecting | ||
* a reply (for exactly-once delivery subscriptions). | ||
* | ||
* @param {Message} message The message to modify. | ||
* @param {number} deadline The deadline. | ||
* @returns {Promise<AckResponse>} | ||
* @private | ||
*/ | ||
modAckWithResponse(message: Message, deadline: number): Promise<AckResponse>; | ||
/** | ||
* Modfies the acknowledge deadline for the provided message and then removes | ||
@@ -260,3 +329,3 @@ * it from our inventory. | ||
* @param {Message} message The message. | ||
* @return {Promise} | ||
* @return {Promise<void>} | ||
* @private | ||
@@ -266,2 +335,12 @@ */ | ||
/** | ||
* Modfies the acknowledge deadline for the provided message and then removes | ||
* it from our inventory, expecting a response from modAck (for | ||
* exactly-once delivery subscriptions). | ||
* | ||
* @param {Message} message The message. | ||
* @return {Promise<AckResponse>} | ||
* @private | ||
*/ | ||
nackWithResponse(message: Message): Promise<AckResponse>; | ||
/** | ||
* Starts pulling messages. | ||
@@ -312,1 +391,2 @@ * @private | ||
} | ||
export {}; |
@@ -18,3 +18,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Subscriber = exports.Message = void 0; | ||
exports.Subscriber = exports.Message = exports.AckError = exports.AckResponses = void 0; | ||
const precise_date_1 = require("@google-cloud/precise-date"); | ||
@@ -32,5 +32,27 @@ const projectify_1 = require("@google-cloud/projectify"); | ||
const opentelemetry_tracing_1 = require("./opentelemetry-tracing"); | ||
const util_1 = require("./util"); | ||
const temporal_1 = require("./temporal"); | ||
exports.AckResponses = { | ||
PermissionDenied: 'PERMISSION_DENIED', | ||
FailedPrecondition: 'FAILED_PRECONDITION', | ||
Success: 'SUCCESS', | ||
Invalid: 'INVALID', | ||
Other: 'OTHER', | ||
}; | ||
/** | ||
* Thrown when an error is detected in an ack/nack/modack call, when | ||
* exactly-once delivery is enabled on the subscription. This will | ||
* only be thrown for actual errors that can't be retried. | ||
*/ | ||
class AckError extends Error { | ||
constructor(errorCode, message) { | ||
let finalMessage = `${errorCode}`; | ||
if (message) { | ||
finalMessage += ` : ${message}`; | ||
} | ||
super(finalMessage); | ||
this.errorCode = errorCode; | ||
} | ||
} | ||
exports.AckError = AckError; | ||
/** | ||
* Date object with nanosecond precision. Supports all standard Date arguments | ||
@@ -165,2 +187,26 @@ * in addition to several custom types. | ||
/** | ||
* Acknowledges the message, expecting a response (for exactly-once delivery subscriptions). | ||
* If exactly-once delivery is not enabled, this will immediately resolve successfully. | ||
* | ||
* @example | ||
* ``` | ||
* subscription.on('message', async (message) => { | ||
* const response = await message.ackWithResponse(); | ||
* }); | ||
* ``` | ||
*/ | ||
async ackWithResponse() { | ||
if (!this._subscriber.isExactlyOnceDelivery) { | ||
this.ack(); | ||
return exports.AckResponses.Success; | ||
} | ||
if (!this._handled) { | ||
this._handled = true; | ||
return await this._subscriber.ackWithResponse(this); | ||
} | ||
else { | ||
return exports.AckResponses.Invalid; | ||
} | ||
} | ||
/** | ||
* Modifies the ack deadline. | ||
@@ -177,2 +223,21 @@ * | ||
/** | ||
* Modifies the ack deadline, expecting a response (for exactly-once delivery subscriptions). | ||
* If exactly-once delivery is not enabled, this will immediately resolve successfully. | ||
* | ||
* @param {number} deadline The number of seconds to extend the deadline. | ||
* @private | ||
*/ | ||
async modAckWithResponse(deadline) { | ||
if (!this._subscriber.isExactlyOnceDelivery) { | ||
this.modAck(deadline); | ||
return exports.AckResponses.Success; | ||
} | ||
if (!this._handled) { | ||
return await this._subscriber.modAckWithResponse(this, deadline); | ||
} | ||
else { | ||
return exports.AckResponses.Invalid; | ||
} | ||
} | ||
/** | ||
* Removes the message from our inventory and schedules it to be redelivered. | ||
@@ -193,5 +258,30 @@ * | ||
} | ||
/** | ||
* Removes the message from our inventory and schedules it to be redelivered, | ||
* with the modAck response being returned (for exactly-once delivery subscriptions). | ||
* If exactly-once delivery is not enabled, this will immediately resolve successfully. | ||
* | ||
* @example | ||
* ``` | ||
* subscription.on('message', async (message) => { | ||
* const response = await message.nackWithResponse(); | ||
* }); | ||
* ``` | ||
*/ | ||
async nackWithResponse() { | ||
if (!this._subscriber.isExactlyOnceDelivery) { | ||
this.nack(); | ||
return exports.AckResponses.Success; | ||
} | ||
if (!this._handled) { | ||
this._handled = true; | ||
return await this._subscriber.nackWithResponse(this); | ||
} | ||
else { | ||
return exports.AckResponses.Invalid; | ||
} | ||
} | ||
} | ||
exports.Message = Message; | ||
const minAckDeadlineForExactlyOnce = temporal_1.Duration.from({ seconds: 60 }); | ||
const minAckDeadlineForExactlyOnceDelivery = temporal_1.Duration.from({ seconds: 60 }); | ||
/** | ||
@@ -218,3 +308,2 @@ * Subscriber class is used to manage all message related functionality. | ||
this._subscription = subscription; | ||
this._errorLog = new util_1.Throttler(60 * 1000); | ||
this.setOptions(options); | ||
@@ -244,3 +333,3 @@ } | ||
// Grab our current min/max deadline values, based on whether exactly-once | ||
// is enabled, and the defaults. | ||
// delivery is enabled, and the defaults. | ||
const [minDeadline, maxDeadline] = this.getMinMaxDeadlines(); | ||
@@ -258,6 +347,7 @@ if (minDeadline) { | ||
var _a, _b; | ||
// If this is an exactly-once subscription, and the user didn't set their | ||
// own minimum ack periods, set it to the default for exactly-once. | ||
const defaultMinDeadline = this.isExactlyOnce | ||
? minAckDeadlineForExactlyOnce | ||
// If this is an exactly-once delivery subscription, and the user | ||
// didn't set their own minimum ack periods, set it to the default | ||
// for exactly-once delivery. | ||
const defaultMinDeadline = this.isExactlyOnceDelivery | ||
? minAckDeadlineForExactlyOnceDelivery | ||
: default_options_1.defaultOptions.subscription.minAckDeadline; | ||
@@ -271,7 +361,7 @@ const defaultMaxDeadline = default_options_1.defaultOptions.subscription.maxAckDeadline; | ||
/** | ||
* Returns true if an exactly once subscription has been detected. | ||
* Returns true if an exactly-once delivery subscription has been detected. | ||
* | ||
* @private | ||
*/ | ||
get isExactlyOnce() { | ||
get isExactlyOnceDelivery() { | ||
if (!this.subscriptionProperties) { | ||
@@ -289,14 +379,13 @@ return false; | ||
setSubscriptionProperties(subscriptionProperties) { | ||
const previouslyEnabled = this.isExactlyOnce; | ||
const previouslyEnabled = this.isExactlyOnceDelivery; | ||
this.subscriptionProperties = subscriptionProperties; | ||
// If this is an exactly-once subscription... | ||
if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) { | ||
// Warn the user that they may have difficulty. | ||
this._errorLog.doMaybe(() => console.error('WARNING: Exactly-once subscriptions are not yet supported ' + | ||
'by the Node client library. This feature will be added ' + | ||
'in a future release.')); | ||
} | ||
// Update ackDeadline in case the flag switched. | ||
if (previouslyEnabled !== this.isExactlyOnce) { | ||
if (previouslyEnabled !== this.isExactlyOnceDelivery) { | ||
this.updateAckDeadline(); | ||
// For exactly-once delivery, make sure the subscription ack deadline is 60. | ||
// (Otherwise fall back to the default of 10 seconds.) | ||
const subscriptionAckDeadlineSeconds = this.isExactlyOnceDelivery | ||
? 60 | ||
: 10; | ||
this._stream.setStreamAckDeadline(temporal_1.Duration.from({ seconds: subscriptionAckDeadlineSeconds })); | ||
} | ||
@@ -335,3 +424,3 @@ } | ||
* @param {Message} message The message to acknowledge. | ||
* @returns {Promise} | ||
* @returns {Promise<void>} | ||
* @private | ||
@@ -342,3 +431,7 @@ */ | ||
this.updateAckDeadline(ackTimeSeconds); | ||
this._acks.add(message); | ||
// Ignore this in this version of the method (but hook then/catch | ||
// to avoid unhandled exceptions). | ||
const resultPromise = this._acks.add(message); | ||
resultPromise.then(() => { }); | ||
resultPromise.catch(() => { }); | ||
await this._acks.onFlush(); | ||
@@ -348,2 +441,18 @@ this._inventory.remove(message); | ||
/** | ||
* Acknowledges the supplied message, expecting a response (for exactly | ||
* once subscriptions). | ||
* | ||
* @param {Message} message The message to acknowledge. | ||
* @returns {Promise<AckResponse>} | ||
* @private | ||
*/ | ||
async ackWithResponse(message) { | ||
const ackTimeSeconds = (Date.now() - message.received) / 1000; | ||
this.updateAckDeadline(ackTimeSeconds); | ||
await this._acks.add(message); | ||
this._inventory.remove(message); | ||
// No exception means Success. | ||
return exports.AckResponses.Success; | ||
} | ||
/** | ||
* Closes the subscriber. The returned promise will resolve once any pending | ||
@@ -364,2 +473,4 @@ * acks/modAcks are finished. | ||
this.emit('close'); | ||
this._acks.close(); | ||
this._modAcks.close(); | ||
} | ||
@@ -384,3 +495,3 @@ /** | ||
* @param {number} deadline The deadline. | ||
* @returns {Promise} | ||
* @returns {Promise<void>} | ||
* @private | ||
@@ -390,3 +501,5 @@ */ | ||
const startTime = Date.now(); | ||
this._modAcks.add(message, deadline); | ||
const responsePromise = this._modAcks.add(message, deadline); | ||
responsePromise.then(() => { }); | ||
responsePromise.catch(() => { }); | ||
await this._modAcks.onFlush(); | ||
@@ -397,2 +510,19 @@ const latency = (Date.now() - startTime) / 1000; | ||
/** | ||
* Modifies the acknowledge deadline for the provided message, expecting | ||
* a reply (for exactly-once delivery subscriptions). | ||
* | ||
* @param {Message} message The message to modify. | ||
* @param {number} deadline The deadline. | ||
* @returns {Promise<AckResponse>} | ||
* @private | ||
*/ | ||
async modAckWithResponse(message, deadline) { | ||
const startTime = Date.now(); | ||
await this._modAcks.add(message, deadline); | ||
const latency = (Date.now() - startTime) / 1000; | ||
this._latencies.add(latency); | ||
// No exception means Success. | ||
return exports.AckResponses.Success; | ||
} | ||
/** | ||
* Modfies the acknowledge deadline for the provided message and then removes | ||
@@ -402,3 +532,3 @@ * it from our inventory. | ||
* @param {Message} message The message. | ||
* @return {Promise} | ||
* @return {Promise<void>} | ||
* @private | ||
@@ -411,2 +541,14 @@ */ | ||
/** | ||
* Modfies the acknowledge deadline for the provided message and then removes | ||
* it from our inventory, expecting a response from modAck (for | ||
* exactly-once delivery subscriptions). | ||
* | ||
* @param {Message} message The message. | ||
* @return {Promise<AckResponse>} | ||
* @private | ||
*/ | ||
async nackWithResponse(message) { | ||
return await this.modAckWithResponse(message, 0); | ||
} | ||
/** | ||
* Starts pulling messages. | ||
@@ -528,3 +670,3 @@ * @private | ||
_onData(response) { | ||
// Grab the subscription properties for exactly once and ordering flags. | ||
// Grab the subscription properties for exactly-once delivery and ordering flags. | ||
if (response.subscriptionProperties) { | ||
@@ -531,0 +673,0 @@ this.setSubscriptionProperties(response.subscriptionProperties); |
@@ -26,2 +26,3 @@ /*! | ||
import { Topic } from './topic'; | ||
export { AckError, AckResponse, AckResponses } from './subscriber'; | ||
export declare type PushConfig = google.pubsub.v1.IPushConfig; | ||
@@ -740,2 +741,1 @@ export declare type OidcToken = google.pubsub.v1.PushConfig.IOidcToken; | ||
} | ||
export {}; |
@@ -18,3 +18,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Subscription = void 0; | ||
exports.Subscription = exports.AckResponses = exports.AckError = void 0; | ||
const events_1 = require("events"); | ||
@@ -27,2 +27,5 @@ const extend = require("extend"); | ||
const util_1 = require("./util"); | ||
var subscriber_2 = require("./subscriber"); | ||
Object.defineProperty(exports, "AckError", { enumerable: true, get: function () { return subscriber_2.AckError; } }); | ||
Object.defineProperty(exports, "AckResponses", { enumerable: true, get: function () { return subscriber_2.AckResponses; } }); | ||
// JSDoc won't see these, so this is just to let you get typings | ||
@@ -29,0 +32,0 @@ // in your editor of choice. |
@@ -46,1 +46,7 @@ /*! | ||
} | ||
/** | ||
* Takes care of managing a Map of buckets to the bucket arrays themselves. | ||
* | ||
* @private | ||
*/ | ||
export declare function addToBucket<T, U>(map: Map<T, U[]>, bucket: T, item: U): void; |
@@ -18,3 +18,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Throttler = exports.noop = exports.promisifySome = void 0; | ||
exports.addToBucket = exports.Throttler = exports.noop = exports.promisifySome = void 0; | ||
const promisify_1 = require("@google-cloud/promisify"); | ||
@@ -67,2 +67,14 @@ /** | ||
exports.Throttler = Throttler; | ||
/** | ||
* Takes care of managing a Map of buckets to the bucket arrays themselves. | ||
* | ||
* @private | ||
*/ | ||
function addToBucket(map, bucket, item) { | ||
var _a; | ||
const items = (_a = map.get(bucket)) !== null && _a !== void 0 ? _a : []; | ||
items.push(item); | ||
map.set(bucket, items); | ||
} | ||
exports.addToBucket = addToBucket; | ||
//# sourceMappingURL=util.js.map |
@@ -271,2 +271,3 @@ "use strict"; | ||
createTopic(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -286,3 +287,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
name: request.name || '', | ||
name: (_a = request.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -293,2 +294,3 @@ this.initialize(); | ||
updateTopic(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -308,3 +310,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
'topic.name': request.topic.name || '', | ||
'topic.name': (_a = request.topic.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -315,2 +317,3 @@ this.initialize(); | ||
publish(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -330,3 +333,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -337,2 +340,3 @@ this.initialize(); | ||
getTopic(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -352,3 +356,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -359,2 +363,3 @@ this.initialize(); | ||
deleteTopic(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -374,3 +379,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -381,2 +386,3 @@ this.initialize(); | ||
detachSubscription(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -396,3 +402,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -403,2 +409,3 @@ this.initialize(); | ||
listTopics(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -418,3 +425,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -450,2 +457,3 @@ this.initialize(); | ||
listTopicsStream(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -457,3 +465,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -492,2 +500,3 @@ const defaultCallSettings = this._defaults['listTopics']; | ||
listTopicsAsync(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -499,3 +508,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -508,2 +517,3 @@ const defaultCallSettings = this._defaults['listTopics']; | ||
listTopicSubscriptions(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -523,3 +533,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -555,2 +565,3 @@ this.initialize(); | ||
listTopicSubscriptionsStream(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -562,3 +573,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -597,2 +608,3 @@ const defaultCallSettings = this._defaults['listTopicSubscriptions']; | ||
listTopicSubscriptionsAsync(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -604,3 +616,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -613,2 +625,3 @@ const defaultCallSettings = this._defaults['listTopicSubscriptions']; | ||
listTopicSnapshots(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -628,3 +641,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -660,2 +673,3 @@ this.initialize(); | ||
listTopicSnapshotsStream(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -667,3 +681,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -702,2 +716,3 @@ const defaultCallSettings = this._defaults['listTopicSnapshots']; | ||
listTopicSnapshotsAsync(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -709,3 +724,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
topic: request.topic || '', | ||
topic: (_a = request.topic) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -712,0 +727,0 @@ const defaultCallSettings = this._defaults['listTopicSnapshots']; |
@@ -254,2 +254,3 @@ "use strict"; | ||
createSchema(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -269,3 +270,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
parent: request.parent || '', | ||
parent: (_a = request.parent) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -276,2 +277,3 @@ this.initialize(); | ||
getSchema(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -291,3 +293,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
name: request.name || '', | ||
name: (_a = request.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -298,2 +300,3 @@ this.initialize(); | ||
deleteSchema(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -313,3 +316,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
name: request.name || '', | ||
name: (_a = request.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -320,2 +323,3 @@ this.initialize(); | ||
validateSchema(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -335,3 +339,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
parent: request.parent || '', | ||
parent: (_a = request.parent) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -342,2 +346,3 @@ this.initialize(); | ||
validateMessage(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -357,3 +362,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
parent: request.parent || '', | ||
parent: (_a = request.parent) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -364,2 +369,3 @@ this.initialize(); | ||
listSchemas(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -379,3 +385,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
parent: request.parent || '', | ||
parent: (_a = request.parent) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -415,2 +421,3 @@ this.initialize(); | ||
listSchemasStream(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -422,3 +429,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
parent: request.parent || '', | ||
parent: (_a = request.parent) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -461,2 +468,3 @@ const defaultCallSettings = this._defaults['listSchemas']; | ||
listSchemasAsync(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -468,3 +476,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
parent: request.parent || '', | ||
parent: (_a = request.parent) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -471,0 +479,0 @@ const defaultCallSettings = this._defaults['listSchemas']; |
@@ -282,2 +282,3 @@ "use strict"; | ||
createSubscription(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -297,3 +298,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
name: request.name || '', | ||
name: (_a = request.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -304,2 +305,3 @@ this.initialize(); | ||
getSubscription(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -319,3 +321,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -326,2 +328,3 @@ this.initialize(); | ||
updateSubscription(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -341,3 +344,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
'subscription.name': request.subscription.name || '', | ||
'subscription.name': (_a = request.subscription.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -348,2 +351,3 @@ this.initialize(); | ||
deleteSubscription(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -363,3 +367,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -370,2 +374,3 @@ this.initialize(); | ||
modifyAckDeadline(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -385,3 +390,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -392,2 +397,3 @@ this.initialize(); | ||
acknowledge(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -407,3 +413,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -414,2 +420,3 @@ this.initialize(); | ||
pull(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -429,3 +436,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -436,2 +443,3 @@ this.initialize(); | ||
modifyPushConfig(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -451,3 +459,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -458,2 +466,3 @@ this.initialize(); | ||
getSnapshot(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -473,3 +482,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
snapshot: request.snapshot || '', | ||
snapshot: (_a = request.snapshot) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -480,2 +489,3 @@ this.initialize(); | ||
createSnapshot(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -495,3 +505,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
name: request.name || '', | ||
name: (_a = request.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -502,2 +512,3 @@ this.initialize(); | ||
updateSnapshot(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -517,3 +528,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
'snapshot.name': request.snapshot.name || '', | ||
'snapshot.name': (_a = request.snapshot.name) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -524,2 +535,3 @@ this.initialize(); | ||
deleteSnapshot(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -539,3 +551,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
snapshot: request.snapshot || '', | ||
snapshot: (_a = request.snapshot) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -546,2 +558,3 @@ this.initialize(); | ||
seek(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -561,3 +574,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
subscription: request.subscription || '', | ||
subscription: (_a = request.subscription) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -591,2 +604,3 @@ this.initialize(); | ||
listSubscriptions(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -606,3 +620,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -638,2 +652,3 @@ this.initialize(); | ||
listSubscriptionsStream(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -645,3 +660,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -680,2 +695,3 @@ const defaultCallSettings = this._defaults['listSubscriptions']; | ||
listSubscriptionsAsync(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -687,3 +703,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -696,2 +712,3 @@ const defaultCallSettings = this._defaults['listSubscriptions']; | ||
listSnapshots(request, optionsOrCallback, callback) { | ||
var _a; | ||
request = request || {}; | ||
@@ -711,3 +728,3 @@ let options; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -743,2 +760,3 @@ this.initialize(); | ||
listSnapshotsStream(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -750,3 +768,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -785,2 +803,3 @@ const defaultCallSettings = this._defaults['listSnapshots']; | ||
listSnapshotsAsync(request, options) { | ||
var _a; | ||
request = request || {}; | ||
@@ -792,3 +811,3 @@ options = options || {}; | ||
this._gaxModule.routingHeader.fromParams({ | ||
project: request.project || '', | ||
project: (_a = request.project) !== null && _a !== void 0 ? _a : '', | ||
}); | ||
@@ -795,0 +814,0 @@ const defaultCallSettings = this._defaults['listSnapshots']; |
{ | ||
"name": "@google-cloud/pubsub", | ||
"description": "Cloud Pub/Sub Client Library for Node.js", | ||
"version": "3.1.1", | ||
"version": "3.2.0", | ||
"license": "Apache-2.0", | ||
@@ -62,2 +62,3 @@ "author": "Google Inc.", | ||
"google-gax": "^3.3.0", | ||
"heap-js": "^2.2.0", | ||
"is-stream-ended": "^0.1.4", | ||
@@ -99,3 +100,3 @@ "lodash.snakecase": "^4.1.1", | ||
"typescript": "^4.6.4", | ||
"uuid": "^8.0.0", | ||
"uuid": "^9.0.0", | ||
"webpack": "^5.0.0", | ||
@@ -102,0 +103,0 @@ "webpack-cli": "^4.0.0", |
@@ -132,2 +132,3 @@ [//]: # "This README.md file is auto-generated, all changes to this file will be lost." | ||
| Create Subscription With Dead Letter Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithDeadLetterPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithDeadLetterPolicy.js,samples/README.md) | | ||
| Create an exactly-once delivery subscription | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithExactlyOnceDelivery.js,samples/README.md) | | ||
| Create Subscription With Filtering | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithFiltering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithFiltering.js,samples/README.md) | | ||
@@ -152,2 +153,3 @@ | Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) | | ||
| Listen For Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessages.js,samples/README.md) | | ||
| Listen with exactly-once delivery | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md) | | ||
| Listen For Protobuf Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForProtobufMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForProtobufMessages.js,samples/README.md) | | ||
@@ -154,0 +156,0 @@ | Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) | |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
3282792
103
55334
247
16
+ Addedheap-js@^2.2.0
+ Addedheap-js@2.6.0(transitive)