@google-cloud/firestore
Advanced tools
Comparing version 4.0.0 to 4.1.0
@@ -25,3 +25,2 @@ /*! | ||
* @class | ||
* @private | ||
*/ | ||
@@ -28,0 +27,0 @@ export declare class BulkWriter { |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const google_gax_1 = require("google-gax"); | ||
const assert = require("assert"); | ||
@@ -8,6 +9,7 @@ const backoff_1 = require("./backoff"); | ||
const write_batch_1 = require("./write-batch"); | ||
const logger_1 = require("./logger"); | ||
/*! | ||
* The maximum number of writes that can be in a single batch. | ||
*/ | ||
const MAX_BATCH_SIZE = 500; | ||
const MAX_BATCH_SIZE = 20; | ||
/*! | ||
@@ -59,9 +61,9 @@ * The starting maximum number of operations per second as allowed by the | ||
this.state = BatchState.OPEN; | ||
// The set of document reference paths present in the WriteBatch. | ||
this.docPaths = new Set(); | ||
// A deferred promise that is resolved after the batch has been sent, and a | ||
// response is received. | ||
this.completedDeferred = new util_1.Deferred(); | ||
// A map from each WriteBatch operation to its corresponding result. | ||
this.resultsMap = new Map(); | ||
// A map from each write's document path to its corresponding result. | ||
// Only contains writes that have not been resolved. | ||
this.pendingOps = new Map(); | ||
this.backoff = new backoff_1.ExponentialBackoff(); | ||
} | ||
@@ -72,3 +74,3 @@ /** | ||
get opCount() { | ||
return this.resultsMap.size; | ||
return this.pendingOps.size; | ||
} | ||
@@ -112,7 +114,6 @@ /** | ||
processOperation(documentRef) { | ||
assert(!this.docPaths.has(documentRef.path), 'Batch should not contain writes to the same document'); | ||
assert(!this.pendingOps.has(documentRef.path), 'Batch should not contain writes to the same document'); | ||
assert(this.state === BatchState.OPEN, 'Batch should be OPEN when adding writes'); | ||
this.docPaths.add(documentRef.path); | ||
const deferred = new util_1.Deferred(); | ||
this.resultsMap.set(this.opCount, deferred); | ||
this.pendingOps.set(documentRef.path, deferred); | ||
if (this.opCount === this.maxBatchSize) { | ||
@@ -131,6 +132,9 @@ this.state = BatchState.READY_TO_SEND; | ||
/** | ||
* Commits the batch and returns a promise that resolves with the result of | ||
* all writes in this batch. | ||
* Commits the batch and returns a promise that resolves when all the writes | ||
* in the batch have finished. | ||
* | ||
* If any writes in the batch fail with a retryable error, this method will | ||
* retry the failed writes. | ||
*/ | ||
bulkCommit() { | ||
async bulkCommit() { | ||
assert(this.state === BatchState.READY_TO_SEND, 'The batch should be marked as READY_TO_SEND before committing'); | ||
@@ -140,5 +144,29 @@ this.state = BatchState.SENT; | ||
const stack = Error().stack; | ||
return this.writeBatch.bulkCommit().catch(err => { | ||
throw util_1.wrapError(err, stack); | ||
}); | ||
let results = []; | ||
for (let attempt = 0; attempt < backoff_1.MAX_RETRY_ATTEMPTS; attempt++) { | ||
await this.backoff.backoffAndWait(); | ||
try { | ||
results = await this.writeBatch.bulkCommit(); | ||
} | ||
catch (err) { | ||
// Map the failure to each individual write's result. | ||
results = [...this.pendingOps.keys()].map(path => { | ||
return { key: path, writeTime: null, status: util_1.wrapError(err, stack) }; | ||
}); | ||
} | ||
this.processResults(results); | ||
if (this.pendingOps.size > 0) { | ||
logger_1.logger('BulkWriter.bulkCommit', null, `Current batch failed at retry #${attempt}. Num failures: ` + | ||
`${this.pendingOps.size}.`); | ||
this.writeBatch = new write_batch_1.WriteBatch(this.firestore, this.writeBatch, [ | ||
...this.pendingOps.keys(), | ||
]); | ||
} | ||
else { | ||
this.completedDeferred.resolve(); | ||
return; | ||
} | ||
} | ||
this.failRemainingOperations(results); | ||
this.completedDeferred.resolve(); | ||
} | ||
@@ -148,15 +176,35 @@ /** | ||
*/ | ||
processResults(results, error) { | ||
if (error === undefined) { | ||
for (let i = 0; i < this.opCount; i++) { | ||
this.resultsMap.get(i).resolve(results[i]); | ||
processResults(results) { | ||
for (const result of results) { | ||
if (result.status.code === google_gax_1.Status.OK) { | ||
this.pendingOps.get(result.key).resolve(result); | ||
this.pendingOps.delete(result.key); | ||
} | ||
} | ||
else { | ||
for (let i = 0; i < this.opCount; i++) { | ||
this.resultsMap.get(i).reject(error); | ||
else if (!this.shouldRetry(result.status.code)) { | ||
this.pendingOps.get(result.key).reject(result.status); | ||
this.pendingOps.delete(result.key); | ||
} | ||
} | ||
this.completedDeferred.resolve(); | ||
} | ||
failRemainingOperations(results) { | ||
for (const result of results) { | ||
assert(result.status.code !== google_gax_1.Status.OK, 'Should not fail successful operation'); | ||
this.pendingOps.get(result.key).reject(result.status); | ||
this.pendingOps.delete(result.key); | ||
} | ||
} | ||
shouldRetry(code) { | ||
const retryCodes = util_1.getRetryCodes('batchWrite'); | ||
return code !== undefined && retryCodes.includes(code); | ||
} | ||
hasPath(path) { | ||
for (const [docPath] of this.pendingOps) { | ||
if (docPath === path) | ||
return true; | ||
} | ||
return false; | ||
} | ||
docPaths() { | ||
return this.pendingOps.keys(); | ||
} | ||
/** | ||
@@ -181,3 +229,2 @@ * Returns a promise that resolves when the batch has been sent, and a | ||
* @class | ||
* @private | ||
*/ | ||
@@ -438,4 +485,3 @@ class BulkWriter { | ||
const lastBatch = this.batchQueue[this.batchQueue.length - 1]; | ||
if (lastBatch.state === BatchState.OPEN && | ||
!lastBatch.docPaths.has(ref.path)) { | ||
if (lastBatch.state === BatchState.OPEN && !lastBatch.hasPath(ref.path)) { | ||
return lastBatch; | ||
@@ -498,11 +544,3 @@ } | ||
assert(success, 'Batch should be under rate limit to be sent.'); | ||
batch | ||
.bulkCommit() | ||
.then(results => { | ||
batch.processResults(results); | ||
}) | ||
.catch((error) => { | ||
batch.processResults([], error); | ||
}) | ||
.then(() => { | ||
batch.bulkCommit().then(() => { | ||
// Remove the batch from the BatchQueue after it has been processed. | ||
@@ -526,6 +564,6 @@ const batchIndex = this.batchQueue.indexOf(batch); | ||
} | ||
for (const path of batch.docPaths) { | ||
for (const path of batch.docPaths()) { | ||
const isRefInFlight = this.batchQueue | ||
.filter(batch => batch.state === BatchState.SENT) | ||
.find(batch => batch.docPaths.has(path)) !== undefined; | ||
.find(batch => batch.hasPath(path)) !== undefined; | ||
if (isRefInFlight) { | ||
@@ -532,0 +570,0 @@ // eslint-disable-next-line no-console |
@@ -27,6 +27,7 @@ /*! | ||
import { Transaction } from './transaction'; | ||
import { BulkWriterOptions, FirestoreStreamingMethod, FirestoreUnaryMethod } from './types'; | ||
import { FirestoreStreamingMethod, FirestoreUnaryMethod } from './types'; | ||
import { WriteBatch } from './write-batch'; | ||
import api = google.firestore.v1; | ||
export { CollectionReference, DocumentReference, QuerySnapshot, Query, } from './reference'; | ||
export { BulkWriter } from './bulk-writer'; | ||
export { DocumentSnapshot, QueryDocumentSnapshot } from './document'; | ||
@@ -41,3 +42,2 @@ export { FieldValue } from './field-value'; | ||
export { setLogFunction } from './logger'; | ||
export { BulkWriterOptions } from './types'; | ||
export { Status as GrpcStatus } from 'google-gax'; | ||
@@ -382,3 +382,2 @@ /** | ||
* | ||
* @private | ||
* @param {object=} options BulkWriter options. | ||
@@ -409,3 +408,3 @@ * @param {boolean=} options.disableThrottling Whether to disable throttling | ||
*/ | ||
_bulkWriter(options?: BulkWriterOptions): BulkWriter; | ||
bulkWriter(options?: firestore.BulkWriterOptions): BulkWriter; | ||
/** | ||
@@ -412,0 +411,0 @@ * Creates a [DocumentSnapshot]{@link DocumentSnapshot} or a |
@@ -44,2 +44,4 @@ "use strict"; | ||
exports.Query = reference_3.Query; | ||
var bulk_writer_2 = require("./bulk-writer"); | ||
exports.BulkWriter = bulk_writer_2.BulkWriter; | ||
var document_2 = require("./document"); | ||
@@ -571,3 +573,2 @@ exports.DocumentSnapshot = document_2.DocumentSnapshot; | ||
* | ||
* @private | ||
* @param {object=} options BulkWriter options. | ||
@@ -598,3 +599,3 @@ * @param {boolean=} options.disableThrottling Whether to disable throttling | ||
*/ | ||
_bulkWriter(options) { | ||
bulkWriter(options) { | ||
return new bulk_writer_1.BulkWriter(this, !(options === null || options === void 0 ? void 0 : options.disableThrottling)); | ||
@@ -601,0 +602,0 @@ } |
@@ -80,10 +80,2 @@ /*! | ||
/** | ||
* An options object that can be used to disable request throttling in | ||
* BulkWriter. | ||
*/ | ||
export interface BulkWriterOptions { | ||
/** Whether to disable throttling. */ | ||
readonly disableThrottling?: boolean; | ||
} | ||
/** | ||
* A Firestore Proto value in ProtoJs format. | ||
@@ -90,0 +82,0 @@ * @private |
@@ -17,2 +17,6 @@ { | ||
"UNAVAILABLE" | ||
], | ||
"aborted_unavailable": [ | ||
"ABORTED", | ||
"UNAVAILABLE" | ||
] | ||
@@ -97,3 +101,4 @@ }, | ||
"BatchWrite": { | ||
"retry_codes_name": "non_idempotent", | ||
"timeout_millis": 60000, | ||
"retry_codes_name": "aborted_unavailable", | ||
"retry_params_name": "default" | ||
@@ -100,0 +105,0 @@ }, |
@@ -66,5 +66,6 @@ /*! | ||
export declare class BatchWriteResult { | ||
readonly key: string; | ||
readonly writeTime: Timestamp | null; | ||
readonly status: GoogleError; | ||
constructor(writeTime: Timestamp | null, status: GoogleError); | ||
constructor(key: string, writeTime: Timestamp | null, status: GoogleError); | ||
} | ||
@@ -82,4 +83,6 @@ /** | ||
/** | ||
* An array of write operations that are executed as part of the commit. The | ||
* resulting `api.IWrite` will be sent to the backend. | ||
* An array of document paths and the corresponding write operations that are | ||
* executed as part of the commit. The resulting `api.IWrite` will be sent to | ||
* the backend. | ||
* | ||
* @private | ||
@@ -93,3 +96,7 @@ */ | ||
* @param firestore The Firestore Database client. | ||
* @param retryBatch The WriteBatch that needs to be retried. | ||
* @param docsToRetry The documents from the provided WriteBatch that need | ||
* to be retried. | ||
*/ | ||
constructor(firestore: Firestore, retryBatch: WriteBatch, docsToRetry: string[]); | ||
constructor(firestore: Firestore); | ||
@@ -96,0 +103,0 @@ /** |
@@ -79,3 +79,4 @@ "use strict"; | ||
class BatchWriteResult { | ||
constructor(writeTime, status) { | ||
constructor(key, writeTime, status) { | ||
this.key = key; | ||
this.writeTime = writeTime; | ||
@@ -93,11 +94,8 @@ this.status = status; | ||
class WriteBatch { | ||
/** | ||
* @hideconstructor | ||
* | ||
* @param firestore The Firestore Database client. | ||
*/ | ||
constructor(firestore) { | ||
constructor(firestore, retryBatch, docsToRetry) { | ||
/** | ||
* An array of write operations that are executed as part of the commit. The | ||
* resulting `api.IWrite` will be sent to the backend. | ||
* An array of document paths and the corresponding write operations that are | ||
* executed as part of the commit. The resulting `api.IWrite` will be sent to | ||
* the backend. | ||
* | ||
* @private | ||
@@ -110,2 +108,7 @@ */ | ||
this._allowUndefined = !!firestore._settings.ignoreUndefinedProperties; | ||
if (retryBatch) { | ||
// Creates a new WriteBatch containing only the operations from the | ||
// provided document paths to retry. | ||
this._ops = retryBatch._ops.filter(v => docsToRetry.indexOf(v.docPath) !== -1); | ||
} | ||
} | ||
@@ -168,3 +171,3 @@ /** | ||
}; | ||
this._ops.push(op); | ||
this._ops.push({ docPath: documentRef.path, op }); | ||
return this; | ||
@@ -207,3 +210,3 @@ } | ||
}; | ||
this._ops.push(op); | ||
this._ops.push({ docPath: documentRef.path, op }); | ||
return this; | ||
@@ -282,3 +285,3 @@ } | ||
}; | ||
this._ops.push(op); | ||
this._ops.push({ docPath: documentRef.path, op }); | ||
return this; | ||
@@ -400,3 +403,3 @@ } | ||
}; | ||
this._ops.push(op); | ||
this._ops.push({ docPath: documentRef.path, op }); | ||
return this; | ||
@@ -444,7 +447,7 @@ } | ||
database, | ||
writes: this._ops.map(op => op()), | ||
writes: this._ops.map(op => op.op()), | ||
}; | ||
const retryCodes = [google_gax_1.Status.ABORTED, ...util_1.getRetryCodes('commit')]; | ||
const retryCodes = util_1.getRetryCodes('batchWrite'); | ||
const response = await this._firestore.request('batchWrite', request, tag, retryCodes); | ||
return (response.writeResults || []).map((result, i) => { | ||
return response.writeResults.map((result, i) => { | ||
const status = response.status[i]; | ||
@@ -460,3 +463,3 @@ const error = new google_gax_1.GoogleError(status.message || undefined); | ||
: null; | ||
return new BatchWriteResult(updateTime, error); | ||
return new BatchWriteResult(this._ops[i].docPath, updateTime, error); | ||
}); | ||
@@ -483,3 +486,3 @@ } | ||
database, | ||
writes: this._ops.map(op => op()), | ||
writes: this._ops.map(op => op.op()), | ||
}; | ||
@@ -486,0 +489,0 @@ if (commitOptions === null || commitOptions === void 0 ? void 0 : commitOptions.transactionId) { |
@@ -7,2 +7,9 @@ # Changelog | ||
## [4.1.0](https://www.github.com/googleapis/nodejs-firestore/compare/v4.0.0...v4.1.0) (2020-07-08) | ||
### Features | ||
* Added `Firestore.bulkWriter()`, which performs large scale writes in parallel. By default, BulkWriter throttles writes according to the "500/50/5" rule and retries writes that fail due to contention. ([#1252](https://www.github.com/googleapis/nodejs-firestore/issues/1252)) ([d0c6c4b](https://www.github.com/googleapis/nodejs-firestore/commit/d0c6c4b116e096a1bb59c89de26cedb8cf5f1224)) | ||
## [4.0.0](https://www.github.com/googleapis/nodejs-firestore/compare/v3.8.6...v4.0.0) (2020-06-24) | ||
@@ -9,0 +16,0 @@ |
{ | ||
"name": "@google-cloud/firestore", | ||
"description": "Firestore Client Library for Node.js", | ||
"version": "4.0.0", | ||
"version": "4.1.0", | ||
"license": "Apache-2.0", | ||
@@ -85,4 +85,4 @@ "author": "Google Inc.", | ||
"typescript": "3.8.3", | ||
"through2": "^3.0.0" | ||
"through2": "^4.0.0" | ||
} | ||
} |
@@ -452,2 +452,147 @@ /** | ||
/** | ||
* A Firestore BulkWriter than can be used to perform a large number of writes | ||
* in parallel. Writes to the same document will be executed sequentially. | ||
* | ||
* @class | ||
*/ | ||
export class BulkWriter { | ||
private constructor(); | ||
/** | ||
* Create a document with the provided data. This single operation will fail | ||
* if a document exists at its location. | ||
* | ||
* @param documentRef A reference to the document to be | ||
* created. | ||
* @param data The object to serialize as the document. | ||
* @returns A promise that resolves with the result | ||
* of the write. Throws an error if the write fails. | ||
*/ | ||
create( | ||
documentRef: DocumentReference, | ||
data: DocumentData | ||
): Promise<WriteResult>; | ||
/** | ||
* Delete a document from the database. | ||
* | ||
* @param documentRef A reference to the document to be | ||
* deleted. | ||
* @param precondition A precondition to enforce for this | ||
* delete. | ||
* @param precondition.lastUpdateTime If set, enforces that the | ||
* document was last updated at lastUpdateTime. Fails the batch if the | ||
* document doesn't exist or was last updated at a different time. | ||
* @returns A promise that resolves with the result | ||
* of the write. Throws an error if the write fails. | ||
*/ | ||
delete( | ||
documentRef: DocumentReference, | ||
precondition?: Precondition | ||
): Promise<WriteResult>; | ||
/** | ||
* Write to the document referred to by the provided | ||
* [DocumentReference]{@link DocumentReference}. If the document does not | ||
* exist yet, it will be created. If you pass | ||
* [SetOptions]{@link SetOptions}., the provided data can be merged into the | ||
* existing document. | ||
* | ||
* @param documentRef A reference to the document to be | ||
* set. | ||
* @param data The object to serialize as the document. | ||
* @param options An object to configure the set behavior. | ||
* @param options.merge - If true, set() merges the values | ||
* specified in its data argument. Fields omitted from this set() call | ||
* remain untouched. | ||
* @param options.mergeFields - If provided, | ||
* set() only replaces the specified field paths. Any field path that is not | ||
* specified is ignored and remains untouched. | ||
* @returns A promise that resolves with the result | ||
* of the write. Throws an error if the write fails. | ||
*/ | ||
set<T>( | ||
documentRef: DocumentReference<T>, | ||
data: Partial<T>, | ||
options: SetOptions | ||
): Promise<WriteResult>; | ||
set<T>(documentRef: DocumentReference<T>, data: T): Promise<WriteResult>; | ||
/** | ||
* Update fields of the document referred to by the provided | ||
* [DocumentReference]{@link DocumentReference}. If the document doesn't yet | ||
* exist, the update fails and the entire batch will be rejected. | ||
* | ||
* The update() method accepts either an object with field paths encoded as | ||
* keys and field values encoded as values, or a variable number of | ||
* arguments that alternate between field paths and field values. Nested | ||
* fields can be updated by providing dot-separated field path strings or by | ||
* providing FieldPath objects. | ||
* | ||
* | ||
* A Precondition restricting this update can be specified as the last | ||
* argument. | ||
* | ||
* @param documentRef A reference to the document to be | ||
* updated. | ||
* @param dataOrField An object containing the | ||
* fields and values with which to update the document or the path of the | ||
* first field to update. | ||
* @param preconditionOrValues - An | ||
* alternating list of field paths and values to update or a Precondition to | ||
* restrict this update | ||
* @returns A promise that resolves with the result | ||
* of the write. Throws an error if the write fails. | ||
*/ | ||
update( | ||
documentRef: DocumentReference, | ||
dataOrField: UpdateData | string | FieldPath, | ||
...preconditionOrValues: Array< | ||
{lastUpdateTime?: Timestamp} | unknown | string | FieldPath | ||
> | ||
): Promise<WriteResult>; | ||
/** | ||
* Commits all writes that have been enqueued up to this point in parallel. | ||
* | ||
* Returns a Promise that resolves when all currently queued operations have | ||
* been committed. The Promise will never be rejected since the results for | ||
* each individual operation are conveyed via their individual Promises. | ||
* | ||
* The Promise resolves immediately if there are no pending writes. | ||
* Otherwise, the Promise waits for all previously issued writes, but it | ||
* does not wait for writes that were added after the method is called. If | ||
* you want to wait for additional writes, call `flush()` again. | ||
* | ||
* @return A promise that resolves when all enqueued writes | ||
* up to this point have been committed. | ||
*/ | ||
flush(): Promise<void>; | ||
/** | ||
* Commits all enqueued writes and marks the BulkWriter instance as closed. | ||
* | ||
* After calling `close()`, calling any method wil throw an error. | ||
* | ||
* Returns a Promise that resolves when all writes have been committed. The | ||
* Promise will never be rejected. Calling this method will send all | ||
* requests. The promise resolves immediately if there are no pending | ||
* writes. | ||
* | ||
* @return A promise that resolves when all enqueued writes | ||
* up to this point have been committed. | ||
*/ | ||
close(): Promise<void>; | ||
} | ||
/** | ||
* An options object that can be used to disable request throttling in | ||
* BulkWriter. | ||
*/ | ||
export interface BulkWriterOptions { | ||
/** Whether to disable throttling. */ | ||
readonly disableThrottling?: boolean; | ||
} | ||
/** | ||
* A write batch, used to perform multiple writes as a single atomic unit. | ||
@@ -454,0 +599,0 @@ * |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4393479
81776