@google-cloud/firestore
Advanced tools
Comparing version 4.5.0 to 4.6.0
@@ -21,2 +21,3 @@ /*! | ||
import { WriteResult } from './write-batch'; | ||
import GrpcStatus = FirebaseFirestore.GrpcStatus; | ||
/*! | ||
@@ -30,2 +31,31 @@ * The starting maximum number of operations per second as allowed by the | ||
/** | ||
* The error thrown when a BulkWriter operation fails. | ||
* | ||
* @class BulkWriterError | ||
*/ | ||
export declare class BulkWriterError extends Error { | ||
/** The status code of the error. */ | ||
readonly code: GrpcStatus; | ||
/** The error message of the error. */ | ||
readonly message: string; | ||
/** The document reference the operation was performed on. */ | ||
readonly documentRef: firestore.DocumentReference; | ||
/** The type of operation performed. */ | ||
readonly operationType: 'create' | 'set' | 'update' | 'delete'; | ||
/** How many times this operation has been attempted unsuccessfully. */ | ||
readonly failedAttempts: number; | ||
/** @hideconstructor */ | ||
constructor( | ||
/** The status code of the error. */ | ||
code: GrpcStatus, | ||
/** The error message of the error. */ | ||
message: string, | ||
/** The document reference the operation was performed on. */ | ||
documentRef: firestore.DocumentReference, | ||
/** The type of operation performed. */ | ||
operationType: 'create' | 'set' | 'update' | 'delete', | ||
/** How many times this operation has been attempted unsuccessfully. */ | ||
failedAttempts: number); | ||
} | ||
/** | ||
* A Firestore BulkWriter than can be used to perform a large number of writes | ||
@@ -41,16 +71,45 @@ * in parallel. Writes to the same document will be executed sequentially. | ||
*/ | ||
private maxBatchSize; | ||
private _maxBatchSize; | ||
/** | ||
* A queue of batches to be written. | ||
*/ | ||
private batchQueue; | ||
private _batchQueue; | ||
/** | ||
* Whether this BulkWriter instance is closed. Once closed, it cannot be | ||
* opened again. | ||
* A queue of batches containing operations that need to be retried. | ||
*/ | ||
private closed; | ||
private _retryBatchQueue; | ||
/** | ||
* A list of promises that represent sent batches. Each promise is resolved | ||
* when the batch's response is received. This includes batches from both the | ||
* batchQueue and retryBatchQueue. | ||
*/ | ||
private _pendingBatches; | ||
/** | ||
* A list of promises that represent pending BulkWriter operations. Each | ||
* promise is resolved when the BulkWriter operation resolves. This set | ||
* includes retries. Each retry's promise is added, attempted, and removed | ||
* from this set before scheduling the next retry. | ||
*/ | ||
private _pendingOps; | ||
/** | ||
* Whether this BulkWriter instance has started to close. Afterwards, no | ||
* new operations can be enqueued, except for retry operations scheduled by | ||
* the error handler. | ||
*/ | ||
private _closing; | ||
/** | ||
* Rate limiter used to throttle requests as per the 500/50/5 rule. | ||
*/ | ||
private rateLimiter; | ||
private readonly _rateLimiter; | ||
/** | ||
* The user-provided callback to be run every time a BulkWriter operation | ||
* successfully completes. | ||
*/ | ||
private _successFn; | ||
/** | ||
* The user-provided callback to be run every time a BulkWriter operation | ||
* fails. | ||
*/ | ||
private _errorFn; | ||
/** @hideconstructor */ | ||
constructor(firestore: Firestore, options?: firestore.BulkWriterOptions); | ||
@@ -65,3 +124,4 @@ /** | ||
* @returns {Promise<WriteResult>} A promise that resolves with the result of | ||
* the write. Throws an error if the write fails. | ||
* the write. If the write fails, the promise is rejected with a | ||
* [BulkWriterError]{@link BulkWriterError}. | ||
* | ||
@@ -93,5 +153,5 @@ * @example | ||
* document doesn't exist or was last updated at a different time. | ||
* @returns {Promise<WriteResult>} A promise that resolves with a sentinel | ||
* Timestamp indicating that the delete was successful. Throws an error if | ||
* the write fails. | ||
* @returns {Promise<WriteResult>} A promise that resolves with the result of | ||
* the delete. If the delete fails, the promise is rejected with a | ||
* [BulkWriterError]{@link BulkWriterError}. | ||
* | ||
@@ -139,5 +199,5 @@ * @example | ||
* @returns {Promise<WriteResult>} A promise that resolves with the result of | ||
* the write. Throws an error if the write fails. | ||
* the write. If the write fails, the promise is rejected with a | ||
* [BulkWriterError]{@link BulkWriterError}. | ||
* | ||
* | ||
* @example | ||
@@ -161,2 +221,50 @@ * let bulkWriter = firestore.bulkWriter(); | ||
/** | ||
* Attaches a listener that is run every time a BulkWriter operation | ||
* successfully completes. | ||
* | ||
* @param callback A callback to be called every time a BulkWriter operation | ||
* successfully completes. | ||
* @example | ||
* let bulkWriter = firestore.bulkWriter(); | ||
* | ||
* bulkWriter | ||
* .onWriteResult((documentRef, result) => { | ||
* console.log( | ||
* 'Successfully executed write on document: ', | ||
* documentRef, | ||
* ' at: ', | ||
* result | ||
* ); | ||
* }); | ||
*/ | ||
onWriteResult(callback: (documentRef: firestore.DocumentReference, result: WriteResult) => void): void; | ||
/** | ||
* Attaches an error handler listener that is run every time a BulkWriter | ||
* operation fails. | ||
* | ||
* BulkWriter has a default error handler that retries UNAVAILABLE and | ||
* ABORTED errors up to a maximum of 10 failed attempts. When an error | ||
* handler is specified, the default error handler will be overwritten. | ||
* | ||
* @param shouldRetryCallback A callback to be called every time a BulkWriter | ||
* operation fails. Returning `true` will retry the operation. Returning | ||
* `false` will stop the retry loop. | ||
* @example | ||
* let bulkWriter = firestore.bulkWriter(); | ||
* | ||
* bulkWriter | ||
* .onWriteError((error) => { | ||
* if ( | ||
* error.code === GrpcStatus.UNAVAILABLE && | ||
* error.failedAttempts < MAX_RETRY_ATTEMPTS | ||
* ) { | ||
* return true; | ||
* } else { | ||
* console.log('Failed write at document: ', error.documentRef); | ||
* return false; | ||
* } | ||
* }); | ||
*/ | ||
onWriteError(shouldRetryCallback: (error: BulkWriterError) => boolean): void; | ||
/** | ||
* Commits all writes that have been enqueued up to this point in parallel. | ||
@@ -187,6 +295,9 @@ * | ||
flush(): Promise<void>; | ||
private _flush; | ||
/** | ||
* Commits all enqueued writes and marks the BulkWriter instance as closed. | ||
* | ||
* After calling `close()`, calling any method wil throw an error. | ||
* After calling `close()`, calling any method wil throw an error. Any | ||
* retries scheduled as part of an `onWriteError()` handler will be run | ||
* before the `close()` promise resolves. | ||
* | ||
@@ -211,2 +322,5 @@ * Returns a Promise that resolves when there are no more pending writes. The | ||
close(): Promise<void>; | ||
/** | ||
* Throws an error if the BulkWriter instance has been closed. | ||
*/ | ||
private verifyNotClosed; | ||
@@ -221,4 +335,4 @@ /** | ||
/** | ||
* Creates a new batch and adds it to the BatchQueue. If there is already a | ||
* batch enqueued, sends the batch after a new one is created. | ||
* Creates a new batch and adds it to the appropriate batch queue. If there | ||
* is already a batch enqueued, sends the batch after a new one is created. | ||
* | ||
@@ -229,4 +343,4 @@ * @private | ||
/** | ||
* Attempts to send batches starting from the front of the BatchQueue until a | ||
* batch cannot be sent. | ||
* Attempts to send batches starting from the front of the provided batch | ||
* queue until a batch cannot be sent. | ||
* | ||
@@ -242,2 +356,4 @@ * After a batch is complete, try sending batches again. | ||
* | ||
* @param batchCompletedDeferred A deferred promise that resolves when the | ||
* batch has been sent and received. | ||
* @private | ||
@@ -247,2 +363,6 @@ */ | ||
/** | ||
* Schedules and runs the provided operation. | ||
*/ | ||
private _executeWrite; | ||
/** | ||
* Sets the maximum number of allowed operations in a batch. | ||
@@ -249,0 +369,0 @@ * |
@@ -9,4 +9,4 @@ "use strict"; | ||
const write_batch_1 = require("./write-batch"); | ||
const validate_1 = require("./validate"); | ||
const logger_1 = require("./logger"); | ||
const validate_1 = require("./validate"); | ||
/*! | ||
@@ -62,5 +62,2 @@ * The maximum number of writes that can be in a single batch. | ||
this.state = BatchState.OPEN; | ||
// A deferred promise that is resolved after the batch has been sent, and a | ||
// response is received. | ||
this.completedDeferred = new util_1.Deferred(); | ||
// An array of pending write operations. Only contains writes that have not | ||
@@ -83,3 +80,3 @@ // been resolved. | ||
this.writeBatch.create(documentRef, data); | ||
return this.processOperation(documentRef); | ||
return this.processLastOperation(); | ||
} | ||
@@ -92,3 +89,3 @@ /** | ||
this.writeBatch.delete(documentRef, precondition); | ||
return this.processOperation(documentRef); | ||
return this.processLastOperation(); | ||
} | ||
@@ -101,3 +98,3 @@ /** | ||
this.writeBatch.set(documentRef, data, options); | ||
return this.processOperation(documentRef); | ||
return this.processLastOperation(); | ||
} | ||
@@ -110,3 +107,3 @@ /** | ||
this.writeBatch.update(documentRef, dataOrField, ...preconditionOrValues); | ||
return this.processOperation(documentRef); | ||
return this.processLastOperation(); | ||
} | ||
@@ -117,10 +114,6 @@ /** | ||
*/ | ||
processOperation(documentRef) { | ||
processLastOperation() { | ||
assert(this.state === BatchState.OPEN, 'Batch should be OPEN when adding writes'); | ||
const deferred = new util_1.Deferred(); | ||
this.pendingOps.push({ | ||
writeBatchIndex: this.opCount, | ||
key: documentRef.path, | ||
deferred: deferred, | ||
}); | ||
this.pendingOps.push(deferred); | ||
if (this.opCount === this.maxBatchSize) { | ||
@@ -151,26 +144,15 @@ this.state = BatchState.READY_TO_SEND; | ||
let results = []; | ||
for (let attempt = 0; attempt < backoff_1.MAX_RETRY_ATTEMPTS; attempt++) { | ||
await this.backoff.backoffAndWait(); | ||
try { | ||
results = await this.writeBatch.bulkCommit(); | ||
} | ||
catch (err) { | ||
// Map the failure to each individual write's result. | ||
results = this.pendingOps.map(op => { | ||
return { key: op.key, writeTime: null, status: util_1.wrapError(err, stack) }; | ||
}); | ||
} | ||
this.processResults(results, /* allowRetry= */ true); | ||
if (this.pendingOps.length > 0) { | ||
logger_1.logger('BulkWriter.bulkCommit', null, `Current batch failed at retry #${attempt}. Num failures: ` + | ||
`${this.pendingOps.length}.`); | ||
this.writeBatch = new write_batch_1.WriteBatch(this.firestore, this.writeBatch, new Set(this.pendingOps.map(op => op.writeBatchIndex))); | ||
} | ||
else { | ||
this.completedDeferred.resolve(); | ||
return; | ||
} | ||
try { | ||
results = await this.writeBatch.bulkCommit(); | ||
} | ||
this.processResults(results); | ||
this.completedDeferred.resolve(); | ||
catch (err) { | ||
// Map the failure to each individual write's result. | ||
results = this.pendingOps.map(() => { | ||
return { | ||
writeTime: null, | ||
status: util_1.wrapError(err, stack), | ||
}; | ||
}); | ||
} | ||
return this.processResults(results); | ||
} | ||
@@ -180,38 +162,14 @@ /** | ||
*/ | ||
processResults(results, allowRetry = false) { | ||
const newPendingOps = []; | ||
for (let i = 0; i < results.length; i++) { | ||
const result = results[i]; | ||
async processResults(results) { | ||
await Promise.all(results.map((result, i) => { | ||
const op = this.pendingOps[i]; | ||
if (result.status.code === google_gax_1.Status.OK) { | ||
op.deferred.resolve(result); | ||
op.resolve(result); | ||
} | ||
else if (!allowRetry || !this.shouldRetry(result.status.code)) { | ||
op.deferred.reject(result.status); | ||
} | ||
else { | ||
// Retry the operation if it has not been processed. | ||
// Store the current index of pendingOps to preserve the mapping of | ||
// this operation's index in the underlying WriteBatch. | ||
newPendingOps.push({ | ||
writeBatchIndex: i, | ||
key: op.key, | ||
deferred: op.deferred, | ||
}); | ||
op.reject(result.status); | ||
} | ||
} | ||
this.pendingOps = newPendingOps; | ||
return util_1.silencePromise(op.promise); | ||
})); | ||
} | ||
shouldRetry(code) { | ||
const retryCodes = util_1.getRetryCodes('batchWrite'); | ||
return code !== undefined && retryCodes.includes(code); | ||
} | ||
/** | ||
* Returns a promise that resolves when the batch has been sent, and a | ||
* response is received. | ||
*/ | ||
awaitBulkCommit() { | ||
this.markReadyToSend(); | ||
return this.completedDeferred.promise; | ||
} | ||
markReadyToSend() { | ||
@@ -224,2 +182,29 @@ if (this.state === BatchState.OPEN) { | ||
/** | ||
* The error thrown when a BulkWriter operation fails. | ||
* | ||
* @class BulkWriterError | ||
*/ | ||
class BulkWriterError extends Error { | ||
/** @hideconstructor */ | ||
constructor( | ||
/** The status code of the error. */ | ||
code, | ||
/** The error message of the error. */ | ||
message, | ||
/** The document reference the operation was performed on. */ | ||
documentRef, | ||
/** The type of operation performed. */ | ||
operationType, | ||
/** How many times this operation has been attempted unsuccessfully. */ | ||
failedAttempts) { | ||
super(message); | ||
this.code = code; | ||
this.message = message; | ||
this.documentRef = documentRef; | ||
this.operationType = operationType; | ||
this.failedAttempts = failedAttempts; | ||
} | ||
} | ||
exports.BulkWriterError = BulkWriterError; | ||
/** | ||
* A Firestore BulkWriter than can be used to perform a large number of writes | ||
@@ -231,2 +216,3 @@ * in parallel. Writes to the same document will be executed sequentially. | ||
class BulkWriter { | ||
/** @hideconstructor */ | ||
constructor(firestore, options) { | ||
@@ -238,16 +224,49 @@ var _a, _b; | ||
*/ | ||
this.maxBatchSize = MAX_BATCH_SIZE; | ||
this._maxBatchSize = MAX_BATCH_SIZE; | ||
/** | ||
* A queue of batches to be written. | ||
*/ | ||
this.batchQueue = []; | ||
this._batchQueue = []; | ||
/** | ||
* Whether this BulkWriter instance is closed. Once closed, it cannot be | ||
* opened again. | ||
* A queue of batches containing operations that need to be retried. | ||
*/ | ||
this.closed = false; | ||
this._retryBatchQueue = []; | ||
/** | ||
* A list of promises that represent sent batches. Each promise is resolved | ||
* when the batch's response is received. This includes batches from both the | ||
* batchQueue and retryBatchQueue. | ||
*/ | ||
this._pendingBatches = new Set(); | ||
/** | ||
* A list of promises that represent pending BulkWriter operations. Each | ||
* promise is resolved when the BulkWriter operation resolves. This set | ||
* includes retries. Each retry's promise is added, attempted, and removed | ||
* from this set before scheduling the next retry. | ||
*/ | ||
this._pendingOps = new Set(); | ||
/** | ||
* Whether this BulkWriter instance has started to close. Afterwards, no | ||
* new operations can be enqueued, except for retry operations scheduled by | ||
* the error handler. | ||
*/ | ||
this._closing = false; | ||
/** | ||
* The user-provided callback to be run every time a BulkWriter operation | ||
* successfully completes. | ||
*/ | ||
this._successFn = () => { }; | ||
/** | ||
* The user-provided callback to be run every time a BulkWriter operation | ||
* fails. | ||
*/ | ||
this._errorFn = error => { | ||
const retryCodes = util_1.getRetryCodes('batchWrite'); | ||
return (error.code !== undefined && | ||
retryCodes.includes(error.code) && | ||
error.failedAttempts < backoff_1.MAX_RETRY_ATTEMPTS); | ||
}; | ||
this.firestore._incrementBulkWritersCount(); | ||
validateBulkWriterOptions(options); | ||
if ((options === null || options === void 0 ? void 0 : options.throttling) === false) { | ||
this.rateLimiter = new rate_limiter_1.RateLimiter(Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY); | ||
this._rateLimiter = new rate_limiter_1.RateLimiter(Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY, Number.POSITIVE_INFINITY); | ||
} | ||
@@ -273,7 +292,7 @@ else { | ||
// operations per second. | ||
if (startingRate < this.maxBatchSize) { | ||
this.maxBatchSize = startingRate; | ||
if (startingRate < this._maxBatchSize) { | ||
this._maxBatchSize = startingRate; | ||
} | ||
} | ||
this.rateLimiter = new rate_limiter_1.RateLimiter(startingRate, RATE_LIMITER_MULTIPLIER, RATE_LIMITER_MULTIPLIER_MILLIS, maxRate); | ||
this._rateLimiter = new rate_limiter_1.RateLimiter(startingRate, RATE_LIMITER_MULTIPLIER, RATE_LIMITER_MULTIPLIER_MILLIS, maxRate); | ||
} | ||
@@ -289,3 +308,4 @@ } | ||
* @returns {Promise<WriteResult>} A promise that resolves with the result of | ||
* the write. Throws an error if the write fails. | ||
* the write. If the write fails, the promise is rejected with a | ||
* [BulkWriterError]{@link BulkWriterError}. | ||
* | ||
@@ -308,6 +328,5 @@ * @example | ||
this.verifyNotClosed(); | ||
const bulkCommitBatch = this.getEligibleBatch(); | ||
const resultPromise = bulkCommitBatch.create(documentRef, data); | ||
this.sendReadyBatches(); | ||
return resultPromise; | ||
const op = this._executeWrite(documentRef, 'create', bulkCommitBatch => bulkCommitBatch.create(documentRef, data)); | ||
util_1.silencePromise(op); | ||
return op; | ||
} | ||
@@ -324,5 +343,5 @@ /** | ||
* document doesn't exist or was last updated at a different time. | ||
* @returns {Promise<WriteResult>} A promise that resolves with a sentinel | ||
* Timestamp indicating that the delete was successful. Throws an error if | ||
* the write fails. | ||
* @returns {Promise<WriteResult>} A promise that resolves with the result of | ||
* the delete. If the delete fails, the promise is rejected with a | ||
* [BulkWriterError]{@link BulkWriterError}. | ||
* | ||
@@ -345,6 +364,5 @@ * @example | ||
this.verifyNotClosed(); | ||
const bulkCommitBatch = this.getEligibleBatch(); | ||
const resultPromise = bulkCommitBatch.delete(documentRef, precondition); | ||
this.sendReadyBatches(); | ||
return resultPromise; | ||
const op = this._executeWrite(documentRef, 'delete', bulkCommitBatch => bulkCommitBatch.delete(documentRef, precondition)); | ||
util_1.silencePromise(op); | ||
return op; | ||
} | ||
@@ -368,3 +386,4 @@ /** | ||
* @returns {Promise<WriteResult>} A promise that resolves with the result of | ||
* the write. Throws an error if the write fails. | ||
* the write. If the write fails, the promise is rejected with a | ||
* [BulkWriterError]{@link BulkWriterError}. | ||
* | ||
@@ -388,6 +407,5 @@ * | ||
this.verifyNotClosed(); | ||
const bulkCommitBatch = this.getEligibleBatch(); | ||
const resultPromise = bulkCommitBatch.set(documentRef, data, options); | ||
this.sendReadyBatches(); | ||
return resultPromise; | ||
const op = this._executeWrite(documentRef, 'set', bulkCommitBatch => bulkCommitBatch.set(documentRef, data, options)); | ||
util_1.silencePromise(op); | ||
return op; | ||
} | ||
@@ -418,5 +436,5 @@ /** | ||
* @returns {Promise<WriteResult>} A promise that resolves with the result of | ||
* the write. Throws an error if the write fails. | ||
* the write. If the write fails, the promise is rejected with a | ||
* [BulkWriterError]{@link BulkWriterError}. | ||
* | ||
* | ||
* @example | ||
@@ -438,8 +456,59 @@ * let bulkWriter = firestore.bulkWriter(); | ||
this.verifyNotClosed(); | ||
const bulkCommitBatch = this.getEligibleBatch(); | ||
const resultPromise = bulkCommitBatch.update(documentRef, dataOrField, ...preconditionOrValues); | ||
this.sendReadyBatches(); | ||
return resultPromise; | ||
const op = this._executeWrite(documentRef, 'update', bulkCommitBatch => bulkCommitBatch.update(documentRef, dataOrField, ...preconditionOrValues)); | ||
util_1.silencePromise(op); | ||
return op; | ||
} | ||
/** | ||
* Attaches a listener that is run every time a BulkWriter operation | ||
* successfully completes. | ||
* | ||
* @param callback A callback to be called every time a BulkWriter operation | ||
* successfully completes. | ||
* @example | ||
* let bulkWriter = firestore.bulkWriter(); | ||
* | ||
* bulkWriter | ||
* .onWriteResult((documentRef, result) => { | ||
* console.log( | ||
* 'Successfully executed write on document: ', | ||
* documentRef, | ||
* ' at: ', | ||
* result | ||
* ); | ||
* }); | ||
*/ | ||
onWriteResult(callback) { | ||
this._successFn = callback; | ||
} | ||
/** | ||
* Attaches an error handler listener that is run every time a BulkWriter | ||
* operation fails. | ||
* | ||
* BulkWriter has a default error handler that retries UNAVAILABLE and | ||
* ABORTED errors up to a maximum of 10 failed attempts. When an error | ||
* handler is specified, the default error handler will be overwritten. | ||
* | ||
* @param shouldRetryCallback A callback to be called every time a BulkWriter | ||
* operation fails. Returning `true` will retry the operation. Returning | ||
* `false` will stop the retry loop. | ||
* @example | ||
* let bulkWriter = firestore.bulkWriter(); | ||
* | ||
* bulkWriter | ||
* .onWriteError((error) => { | ||
* if ( | ||
* error.code === GrpcStatus.UNAVAILABLE && | ||
* error.failedAttempts < MAX_RETRY_ATTEMPTS | ||
* ) { | ||
* return true; | ||
* } else { | ||
* console.log('Failed write at document: ', error.documentRef); | ||
* return false; | ||
* } | ||
* }); | ||
*/ | ||
onWriteError(shouldRetryCallback) { | ||
this._errorFn = shouldRetryCallback; | ||
} | ||
/** | ||
* Commits all writes that have been enqueued up to this point in parallel. | ||
@@ -469,13 +538,30 @@ * | ||
*/ | ||
async flush() { | ||
flush() { | ||
this.verifyNotClosed(); | ||
const trackedBatches = this.batchQueue; | ||
const writePromises = trackedBatches.map(batch => batch.awaitBulkCommit()); | ||
this.sendReadyBatches(); | ||
await Promise.all(writePromises); | ||
// Copy the pending ops at the time flush() was called. | ||
return this._flush(Array.from(this._pendingOps)); | ||
} | ||
async _flush(pendingOps) { | ||
let batchQueue = this._batchQueue; | ||
batchQueue.forEach(batch => batch.markReadyToSend()); | ||
// Send all scheduled operations on the BatchQueue first. | ||
this.sendReadyBatches(batchQueue); | ||
await Promise.all(this._pendingBatches); | ||
// Afterwards, send all accumulated retry operations. Wait until the | ||
// retryBatchQueue is cleared. This way, operations scheduled after | ||
// flush() will not be sent until the retries are completed. | ||
batchQueue = this._retryBatchQueue; | ||
if (batchQueue.length > 0) { | ||
batchQueue.forEach(batch => batch.markReadyToSend()); | ||
this.sendReadyBatches(batchQueue); | ||
} | ||
// Make sure user promises resolve before flush() resolves. | ||
return util_1.silencePromise(Promise.all(pendingOps)); | ||
} | ||
/** | ||
* Commits all enqueued writes and marks the BulkWriter instance as closed. | ||
* | ||
* After calling `close()`, calling any method wil throw an error. | ||
* After calling `close()`, calling any method wil throw an error. Any | ||
* retries scheduled as part of an `onWriteError()` handler will be run | ||
* before the `close()` promise resolves. | ||
* | ||
@@ -503,7 +589,10 @@ * Returns a Promise that resolves when there are no more pending writes. The | ||
const flushPromise = this.flush(); | ||
this.closed = true; | ||
this._closing = true; | ||
return flushPromise; | ||
} | ||
/** | ||
* Throws an error if the BulkWriter instance has been closed. | ||
*/ | ||
verifyNotClosed() { | ||
if (this.closed) { | ||
if (this._closing) { | ||
throw new Error('BulkWriter has already been closed.'); | ||
@@ -518,5 +607,5 @@ } | ||
*/ | ||
getEligibleBatch() { | ||
if (this.batchQueue.length > 0) { | ||
const lastBatch = this.batchQueue[this.batchQueue.length - 1]; | ||
getEligibleBatch(batchQueue) { | ||
if (batchQueue.length > 0) { | ||
const lastBatch = batchQueue[batchQueue.length - 1]; | ||
if (lastBatch.state === BatchState.OPEN) { | ||
@@ -526,22 +615,22 @@ return lastBatch; | ||
} | ||
return this.createNewBatch(); | ||
return this.createNewBatch(batchQueue); | ||
} | ||
/** | ||
* Creates a new batch and adds it to the BatchQueue. If there is already a | ||
* batch enqueued, sends the batch after a new one is created. | ||
* Creates a new batch and adds it to the appropriate batch queue. If there | ||
* is already a batch enqueued, sends the batch after a new one is created. | ||
* | ||
* @private | ||
*/ | ||
createNewBatch() { | ||
const newBatch = new BulkCommitBatch(this.firestore, this.firestore.batch(), this.maxBatchSize); | ||
if (this.batchQueue.length > 0) { | ||
this.batchQueue[this.batchQueue.length - 1].markReadyToSend(); | ||
this.sendReadyBatches(); | ||
createNewBatch(batchQueue) { | ||
const newBatch = new BulkCommitBatch(this.firestore, this.firestore.batch(), this._maxBatchSize); | ||
if (batchQueue.length > 0) { | ||
batchQueue[batchQueue.length - 1].markReadyToSend(); | ||
this.sendReadyBatches(batchQueue); | ||
} | ||
this.batchQueue.push(newBatch); | ||
batchQueue.push(newBatch); | ||
return newBatch; | ||
} | ||
/** | ||
* Attempts to send batches starting from the front of the BatchQueue until a | ||
* batch cannot be sent. | ||
* Attempts to send batches starting from the front of the provided batch | ||
* queue until a batch cannot be sent. | ||
* | ||
@@ -552,17 +641,24 @@ * After a batch is complete, try sending batches again. | ||
*/ | ||
sendReadyBatches() { | ||
const unsentBatches = this.batchQueue.filter(batch => batch.state === BatchState.READY_TO_SEND); | ||
sendReadyBatches(batchQueue) { | ||
let index = 0; | ||
while (index < unsentBatches.length && | ||
unsentBatches[index].state === BatchState.READY_TO_SEND) { | ||
const batch = unsentBatches[index]; | ||
while (index < batchQueue.length && | ||
batchQueue[index].state === BatchState.READY_TO_SEND) { | ||
const batch = batchQueue[index]; | ||
// Deferred promise that resolves when the current batch or its | ||
// scheduling attempt completes. | ||
const batchCompletedDeferred = new util_1.Deferred(); | ||
this._pendingBatches.add(batchCompletedDeferred.promise); | ||
// Send the batch if it is under the rate limit, or schedule another | ||
// attempt after the appropriate timeout. | ||
const delayMs = this.rateLimiter.getNextRequestDelayMs(batch.opCount); | ||
const delayMs = this._rateLimiter.getNextRequestDelayMs(batch.opCount); | ||
assert(delayMs !== -1, 'Batch size should be under capacity'); | ||
if (delayMs === 0) { | ||
this.sendBatch(batch); | ||
this.sendBatch(batch, batchQueue, batchCompletedDeferred); | ||
} | ||
else { | ||
backoff_1.delayExecution(() => this.sendReadyBatches(), delayMs); | ||
backoff_1.delayExecution(() => { | ||
this.sendReadyBatches(batchQueue); | ||
batchCompletedDeferred.resolve(); | ||
this._pendingBatches.delete(batchCompletedDeferred.promise); | ||
}, delayMs); | ||
break; | ||
@@ -577,16 +673,59 @@ } | ||
* | ||
* @param batchCompletedDeferred A deferred promise that resolves when the | ||
* batch has been sent and received. | ||
* @private | ||
*/ | ||
sendBatch(batch) { | ||
const success = this.rateLimiter.tryMakeRequest(batch.opCount); | ||
sendBatch(batch, batchQueue, batchCompletedDeferred) { | ||
const success = this._rateLimiter.tryMakeRequest(batch.opCount); | ||
assert(success, 'Batch should be under rate limit to be sent.'); | ||
batch.bulkCommit().then(() => { | ||
return batch.bulkCommit().then(() => { | ||
// Remove the batch from the BatchQueue after it has been processed. | ||
const batchIndex = this.batchQueue.indexOf(batch); | ||
const batchIndex = batchQueue.indexOf(batch); | ||
assert(batchIndex !== -1, 'The batch should be in the BatchQueue'); | ||
this.batchQueue.splice(batchIndex, 1); | ||
this.sendReadyBatches(); | ||
batchQueue.splice(batchIndex, 1); | ||
if (batchQueue === this._retryBatchQueue) { | ||
batchQueue.forEach(batch => batch.markReadyToSend()); | ||
} | ||
batchCompletedDeferred.resolve(); | ||
this._pendingBatches.delete(batchCompletedDeferred.promise); | ||
this.sendReadyBatches(batchQueue); | ||
}); | ||
} | ||
/** | ||
* Schedules and runs the provided operation. | ||
*/ | ||
async _executeWrite(documentRef, operationType, operationFn) { | ||
// A deferred promise that resolves when operationFn completes. | ||
const operationCompletedDeferred = new util_1.Deferred(); | ||
this._pendingOps.add(operationCompletedDeferred.promise); | ||
try { | ||
for (let failedAttempts = 0;; ++failedAttempts) { | ||
const batchQueue = failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | ||
const bulkCommitBatch = this.getEligibleBatch(batchQueue); | ||
// Send ready batches if this is the first attempt. Subsequent retry | ||
// batches are scheduled after the initial batch returns. | ||
if (failedAttempts === 0) { | ||
this.sendReadyBatches(batchQueue); | ||
} | ||
try { | ||
const operationResult = await operationFn(bulkCommitBatch); | ||
this._successFn(documentRef, operationResult); | ||
return operationResult; | ||
} | ||
catch (error) { | ||
const bulkWriterError = new BulkWriterError(error.code, error.message, documentRef, operationType, failedAttempts); | ||
const shouldRetry = this._errorFn(bulkWriterError); | ||
logger_1.logger('BulkWriter.errorFn', null, 'Running error callback on error code:', error.code, ', shouldRetry:', shouldRetry); | ||
if (!shouldRetry) { | ||
throw bulkWriterError; | ||
} | ||
} | ||
} | ||
} | ||
finally { | ||
operationCompletedDeferred.resolve(); | ||
this._pendingOps.delete(operationCompletedDeferred.promise); | ||
} | ||
} | ||
/** | ||
* Sets the maximum number of allowed operations in a batch. | ||
@@ -598,3 +737,3 @@ * | ||
_setMaxBatchSize(size) { | ||
this.maxBatchSize = size; | ||
this._maxBatchSize = size; | ||
} | ||
@@ -608,3 +747,3 @@ /** | ||
_getRateLimiter() { | ||
return this.rateLimiter; | ||
return this._rateLimiter; | ||
} | ||
@@ -611,0 +750,0 @@ } |
@@ -311,9 +311,10 @@ "use strict"; | ||
for (let attempt = 0; attempt < maxAttempts; ++attempt) { | ||
if (lastError) { | ||
logger_1.logger('Firestore.runTransaction', this._requestTag, 'Retrying transaction after error:', lastError); | ||
} | ||
this._writeBatch._reset(); | ||
await this.maybeBackoff(lastError); | ||
await this.begin(); | ||
try { | ||
if (lastError) { | ||
logger_1.logger('Firestore.runTransaction', this._requestTag, 'Retrying transaction after error:', lastError); | ||
await this.rollback(); | ||
} | ||
this._writeBatch._reset(); | ||
await this.maybeBackoff(lastError); | ||
await this.begin(); | ||
const promise = updateFunction(this); | ||
@@ -329,12 +330,10 @@ if (!(promise instanceof Promise)) { | ||
logger_1.logger('Firestore.runTransaction', this._requestTag, 'Rolling back transaction after callback error:', err); | ||
await this.rollback(); | ||
if (isRetryableTransactionError(err)) { | ||
lastError = err; | ||
lastError = err; | ||
if (!this._transactionId || !isRetryableTransactionError(err)) { | ||
break; | ||
} | ||
else { | ||
return Promise.reject(err); // Callback failed w/ non-retryable error | ||
} | ||
} | ||
} | ||
logger_1.logger('Firestore.runTransaction', this._requestTag, 'Transaction not eligible for retry, returning error: %s', lastError); | ||
await this.rollback(); | ||
return Promise.reject(lastError); | ||
@@ -432,2 +431,7 @@ } | ||
return true; | ||
case google_gax_1.Status.INVALID_ARGUMENT: | ||
// The Firestore backend uses "INVALID_ARGUMENT" for transactions | ||
// IDs that have expired. While INVALID_ARGUMENT is generally not | ||
// retryable, we retry this specific case. | ||
return !!error.message.match(/transaction has expired/); | ||
default: | ||
@@ -434,0 +438,0 @@ return false; |
@@ -91,2 +91,10 @@ /*! | ||
/** | ||
* Returns a promise with a void return type. The returned promise swallows all | ||
* errors and never throws. | ||
* | ||
* This is primarily used to wait for a promise to complete when the result of | ||
* the promise will be discarded. | ||
*/ | ||
export declare function silencePromise(promise: Promise<unknown>): Promise<void>; | ||
/** | ||
* Wraps the provided error in a new error that includes the provided stack. | ||
@@ -93,0 +101,0 @@ * |
@@ -149,2 +149,13 @@ "use strict"; | ||
/** | ||
* Returns a promise with a void return type. The returned promise swallows all | ||
* errors and never throws. | ||
* | ||
* This is primarily used to wait for a promise to complete when the result of | ||
* the promise will be discarded. | ||
*/ | ||
function silencePromise(promise) { | ||
return promise.then(() => { }, () => { }); | ||
} | ||
exports.silencePromise = silencePromise; | ||
/** | ||
* Wraps the provided error in a new error that includes the provided stack. | ||
@@ -151,0 +162,0 @@ * |
@@ -17,2 +17,3 @@ /*! | ||
import * as firestore from '@google-cloud/firestore'; | ||
import { google } from '../protos/firestore_v1_proto_api'; | ||
import { Firestore } from './index'; | ||
@@ -22,2 +23,3 @@ import { FieldPath } from './path'; | ||
import { RequiredArgumentOptions } from './validate'; | ||
import api = google.firestore.v1; | ||
import { GoogleError } from 'google-gax'; | ||
@@ -73,2 +75,8 @@ /** | ||
/** | ||
* A lazily-evaluated write that allows us to detect the Project ID before | ||
* serializing the request. | ||
* @private | ||
*/ | ||
export declare type PendingWriteOp = () => api.IWrite; | ||
/** | ||
* A Firestore WriteBatch that can be used to atomically commit multiple write | ||
@@ -94,9 +102,3 @@ * operations at once. | ||
* @hideconstructor | ||
* | ||
* @param firestore The Firestore Database client. | ||
* @param retryBatch The WriteBatch that needs to be retried. | ||
* @param indexesToRetry The indexes of the operations from the provided | ||
* WriteBatch that need to be retried. | ||
*/ | ||
constructor(firestore: Firestore, retryBatch: WriteBatch, indexesToRetry: Set<number>); | ||
constructor(firestore: Firestore); | ||
@@ -103,0 +105,0 @@ /** |
@@ -92,3 +92,6 @@ "use strict"; | ||
class WriteBatch { | ||
constructor(firestore, retryBatch, indexesToRetry) { | ||
/** | ||
* @hideconstructor | ||
*/ | ||
constructor(firestore) { | ||
/** | ||
@@ -106,9 +109,2 @@ * An array of document paths and the corresponding write operations that are | ||
this._allowUndefined = !!firestore._settings.ignoreUndefinedProperties; | ||
if (retryBatch) { | ||
// Creates a new WriteBatch containing only the indexes from the provided | ||
// indexes to retry. | ||
for (const index of indexesToRetry.values()) { | ||
this._ops.push(retryBatch._ops[index]); | ||
} | ||
} | ||
} | ||
@@ -115,0 +111,0 @@ /** |
@@ -7,2 +7,14 @@ # Changelog | ||
## [4.6.0](https://www.github.com/googleapis/nodejs-firestore/compare/v4.5.0...v4.6.0) (2020-11-03) | ||
### Features | ||
* add onWriteError() and onWriteResult() handlers to BulkWriter ([#1315](https://www.github.com/googleapis/nodejs-firestore/issues/1315)) ([a173f4d](https://www.github.com/googleapis/nodejs-firestore/commit/a173f4defab7a6e750907fcb86431c56fcb3d4cf)) | ||
### Bug Fixes | ||
* retry transactions that fail with expired transaction IDs ([#1347](https://www.github.com/googleapis/nodejs-firestore/issues/1347)) ([a18ab50](https://www.github.com/googleapis/nodejs-firestore/commit/a18ab50f3304f1154caaaab9768b736bdb3d8442)) | ||
## [4.5.0](https://www.github.com/googleapis/nodejs-firestore/compare/v4.4.0...v4.5.0) (2020-10-26) | ||
@@ -9,0 +21,0 @@ |
{ | ||
"name": "@google-cloud/firestore", | ||
"description": "Firestore Client Library for Node.js", | ||
"version": "4.5.0", | ||
"version": "4.6.0", | ||
"license": "Apache-2.0", | ||
@@ -6,0 +6,0 @@ "author": "Google Inc.", |
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4457031
83001