Comparing version 6.9.0-dev.20241003.sha.91f30357 to 6.9.0-dev.20241010.sha.6ecf198f
@@ -5,3 +5,2 @@ "use strict"; | ||
exports.mergeBatchResults = mergeBatchResults; | ||
const util_1 = require("util"); | ||
const bson_1 = require("../bson"); | ||
@@ -287,52 +286,32 @@ const error_1 = require("../error"); | ||
} | ||
function executeCommands(bulkOperation, options, callback) { | ||
async function executeCommands(bulkOperation, options) { | ||
if (bulkOperation.s.batches.length === 0) { | ||
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)); | ||
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
} | ||
const batch = bulkOperation.s.batches.shift(); | ||
function resultHandler(err, result) { | ||
// Error is a driver related error not a bulk op error, return early | ||
if (err && 'message' in err && !(err instanceof error_1.MongoWriteConcernError)) { | ||
return callback(new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered))); | ||
for (const batch of bulkOperation.s.batches) { | ||
const finalOptions = (0, utils_1.resolveOptions)(bulkOperation, { | ||
...options, | ||
ordered: bulkOperation.isOrdered | ||
}); | ||
if (finalOptions.bypassDocumentValidation !== true) { | ||
delete finalOptions.bypassDocumentValidation; | ||
} | ||
if (err instanceof error_1.MongoWriteConcernError) { | ||
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, bulkOperation.isOrdered, err, callback); | ||
// Is the bypassDocumentValidation options specific | ||
if (bulkOperation.s.bypassDocumentValidation === true) { | ||
finalOptions.bypassDocumentValidation = true; | ||
} | ||
// Merge the results together | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result); | ||
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
if (bulkOperation.handleWriteError(callback, writeResult)) | ||
return; | ||
// Execute the next command in line | ||
executeCommands(bulkOperation, options, callback); | ||
} | ||
const finalOptions = (0, utils_1.resolveOptions)(bulkOperation, { | ||
...options, | ||
ordered: bulkOperation.isOrdered | ||
}); | ||
if (finalOptions.bypassDocumentValidation !== true) { | ||
delete finalOptions.bypassDocumentValidation; | ||
} | ||
// Set an operationIf if provided | ||
if (bulkOperation.operationId) { | ||
resultHandler.operationId = bulkOperation.operationId; | ||
} | ||
// Is the bypassDocumentValidation options specific | ||
if (bulkOperation.s.bypassDocumentValidation === true) { | ||
finalOptions.bypassDocumentValidation = true; | ||
} | ||
// Is the checkKeys option disabled | ||
if (bulkOperation.s.checkKeys === false) { | ||
finalOptions.checkKeys = false; | ||
} | ||
if (finalOptions.retryWrites) { | ||
if (isUpdateBatch(batch)) { | ||
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi); | ||
// Is the checkKeys option disabled | ||
if (bulkOperation.s.checkKeys === false) { | ||
finalOptions.checkKeys = false; | ||
} | ||
if (isDeleteBatch(batch)) { | ||
finalOptions.retryWrites = | ||
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); | ||
if (finalOptions.retryWrites) { | ||
if (isUpdateBatch(batch)) { | ||
finalOptions.retryWrites = | ||
finalOptions.retryWrites && !batch.operations.some(op => op.multi); | ||
} | ||
if (isDeleteBatch(batch)) { | ||
finalOptions.retryWrites = | ||
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); | ||
} | ||
} | ||
} | ||
try { | ||
const operation = isInsertBatch(batch) | ||
@@ -345,21 +324,35 @@ ? new insert_1.InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions) | ||
: null; | ||
if (operation != null) { | ||
(0, execute_operation_1.executeOperation)(bulkOperation.s.collection.client, operation).then(result => resultHandler(undefined, result), error => resultHandler(error)); | ||
if (operation == null) | ||
throw new error_1.MongoRuntimeError(`Unknown batchType: ${batch.batchType}`); | ||
let thrownError = null; | ||
let result; | ||
try { | ||
result = await (0, execute_operation_1.executeOperation)(bulkOperation.s.collection.client, operation); | ||
} | ||
catch (error) { | ||
thrownError = error; | ||
} | ||
if (thrownError != null) { | ||
if (thrownError instanceof error_1.MongoWriteConcernError) { | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); | ||
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
throw new MongoBulkWriteError({ | ||
message: thrownError.result.writeConcernError.errmsg, | ||
code: thrownError.result.writeConcernError.code | ||
}, writeResult); | ||
} | ||
else { | ||
// Error is a driver related error not a bulk op error, return early | ||
throw new MongoBulkWriteError(thrownError, new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)); | ||
} | ||
} | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); | ||
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
bulkOperation.handleWriteError(writeResult); | ||
} | ||
catch (err) { | ||
// Force top level error | ||
err.ok = 0; | ||
// Merge top level error and return | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined); | ||
callback(); | ||
} | ||
bulkOperation.s.batches.length = 0; | ||
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
bulkOperation.handleWriteError(writeResult); | ||
return writeResult; | ||
} | ||
function handleMongoWriteConcernError(batch, bulkResult, isOrdered, err, callback) { | ||
mergeBatchResults(batch, bulkResult, undefined, err.result); | ||
callback(new MongoBulkWriteError({ | ||
message: err.result.writeConcernError.errmsg, | ||
code: err.result.writeConcernError.code | ||
}, new BulkWriteResult(bulkResult, isOrdered))); | ||
} | ||
/** | ||
@@ -510,3 +503,2 @@ * An error indicating an unsuccessful Bulk Write | ||
exports.FindOperators = FindOperators; | ||
const executeCommandsAsync = (0, util_1.promisify)(executeCommands); | ||
/** | ||
@@ -527,3 +519,3 @@ * TODO(NODE-4063) | ||
} | ||
execute(_server, session) { | ||
async execute(_server, session) { | ||
if (this.options.session == null) { | ||
@@ -536,3 +528,3 @@ // An implicit session could have been created by 'executeOperation' | ||
} | ||
return executeCommandsAsync(this.bulkOperation, this.options); | ||
return await executeCommands(this.bulkOperation, this.options); | ||
} | ||
@@ -828,3 +820,3 @@ } | ||
*/ | ||
handleWriteError(callback, writeResult) { | ||
handleWriteError(writeResult) { | ||
if (this.s.bulkResult.writeErrors.length > 0) { | ||
@@ -834,15 +826,12 @@ const msg = this.s.bulkResult.writeErrors[0].errmsg | ||
: 'write operation failed'; | ||
callback(new MongoBulkWriteError({ | ||
throw new MongoBulkWriteError({ | ||
message: msg, | ||
code: this.s.bulkResult.writeErrors[0].code, | ||
writeErrors: this.s.bulkResult.writeErrors | ||
}, writeResult)); | ||
return true; | ||
}, writeResult); | ||
} | ||
const writeConcernError = writeResult.getWriteConcernError(); | ||
if (writeConcernError) { | ||
callback(new MongoBulkWriteError(writeConcernError, writeResult)); | ||
return true; | ||
throw new MongoBulkWriteError(writeConcernError, writeResult); | ||
} | ||
return false; | ||
} | ||
@@ -849,0 +838,0 @@ shouldForceServerObjectId() { |
@@ -13,7 +13,7 @@ "use strict"; | ||
} | ||
handleWriteError(callback, writeResult) { | ||
handleWriteError(writeResult) { | ||
if (this.s.batches.length) { | ||
return false; | ||
return; | ||
} | ||
return super.handleWriteError(callback, writeResult); | ||
return super.handleWriteError(writeResult); | ||
} | ||
@@ -20,0 +20,0 @@ addToOperationsList(batchType, document) { |
@@ -334,3 +334,3 @@ "use strict"; | ||
return; | ||
if ((0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion)) { | ||
if (this.cursor.id != null && (0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion)) { | ||
this._endStream(); | ||
@@ -357,3 +357,4 @@ this.cursor.close().then(undefined, utils_1.squashError); | ||
} | ||
if (!(0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion)) { | ||
if (this.cursor.id == null || | ||
!(0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion)) { | ||
try { | ||
@@ -360,0 +361,0 @@ await this.close(); |
{ | ||
"name": "mongodb", | ||
"version": "6.9.0-dev.20241003.sha.91f30357", | ||
"version": "6.9.0-dev.20241010.sha.6ecf198f", | ||
"description": "The official MongoDB driver for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -1,3 +0,1 @@ | ||
import { promisify } from 'util'; | ||
import { type BSONSerializeOptions, type Document, EJSON, resolveBSONOptions } from '../bson'; | ||
@@ -10,2 +8,3 @@ import type { Collection } from '../collection'; | ||
MongoInvalidArgumentError, | ||
MongoRuntimeError, | ||
MongoServerError, | ||
@@ -26,3 +25,2 @@ MongoWriteConcernError | ||
applyRetryableWrites, | ||
type Callback, | ||
getTopology, | ||
@@ -505,82 +503,42 @@ hasAtomicOperators, | ||
function executeCommands( | ||
async function executeCommands( | ||
bulkOperation: BulkOperationBase, | ||
options: BulkWriteOptions, | ||
callback: Callback<BulkWriteResult> | ||
) { | ||
options: BulkWriteOptions | ||
): Promise<BulkWriteResult> { | ||
if (bulkOperation.s.batches.length === 0) { | ||
return callback( | ||
undefined, | ||
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) | ||
); | ||
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
} | ||
const batch = bulkOperation.s.batches.shift() as Batch; | ||
for (const batch of bulkOperation.s.batches) { | ||
const finalOptions = resolveOptions(bulkOperation, { | ||
...options, | ||
ordered: bulkOperation.isOrdered | ||
}); | ||
function resultHandler(err?: AnyError, result?: Document) { | ||
// Error is a driver related error not a bulk op error, return early | ||
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) { | ||
return callback( | ||
new MongoBulkWriteError( | ||
err, | ||
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) | ||
) | ||
); | ||
if (finalOptions.bypassDocumentValidation !== true) { | ||
delete finalOptions.bypassDocumentValidation; | ||
} | ||
if (err instanceof MongoWriteConcernError) { | ||
return handleMongoWriteConcernError( | ||
batch, | ||
bulkOperation.s.bulkResult, | ||
bulkOperation.isOrdered, | ||
err, | ||
callback | ||
); | ||
// Is the bypassDocumentValidation options specific | ||
if (bulkOperation.s.bypassDocumentValidation === true) { | ||
finalOptions.bypassDocumentValidation = true; | ||
} | ||
// Merge the results together | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result); | ||
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
if (bulkOperation.handleWriteError(callback, writeResult)) return; | ||
// Is the checkKeys option disabled | ||
if (bulkOperation.s.checkKeys === false) { | ||
finalOptions.checkKeys = false; | ||
} | ||
// Execute the next command in line | ||
executeCommands(bulkOperation, options, callback); | ||
} | ||
if (finalOptions.retryWrites) { | ||
if (isUpdateBatch(batch)) { | ||
finalOptions.retryWrites = | ||
finalOptions.retryWrites && !batch.operations.some(op => op.multi); | ||
} | ||
const finalOptions = resolveOptions(bulkOperation, { | ||
...options, | ||
ordered: bulkOperation.isOrdered | ||
}); | ||
if (finalOptions.bypassDocumentValidation !== true) { | ||
delete finalOptions.bypassDocumentValidation; | ||
} | ||
// Set an operationIf if provided | ||
if (bulkOperation.operationId) { | ||
resultHandler.operationId = bulkOperation.operationId; | ||
} | ||
// Is the bypassDocumentValidation options specific | ||
if (bulkOperation.s.bypassDocumentValidation === true) { | ||
finalOptions.bypassDocumentValidation = true; | ||
} | ||
// Is the checkKeys option disabled | ||
if (bulkOperation.s.checkKeys === false) { | ||
finalOptions.checkKeys = false; | ||
} | ||
if (finalOptions.retryWrites) { | ||
if (isUpdateBatch(batch)) { | ||
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi); | ||
if (isDeleteBatch(batch)) { | ||
finalOptions.retryWrites = | ||
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); | ||
} | ||
} | ||
if (isDeleteBatch(batch)) { | ||
finalOptions.retryWrites = | ||
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); | ||
} | ||
} | ||
try { | ||
const operation = isInsertBatch(batch) | ||
@@ -594,35 +552,46 @@ ? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions) | ||
if (operation != null) { | ||
executeOperation(bulkOperation.s.collection.client, operation).then( | ||
result => resultHandler(undefined, result), | ||
error => resultHandler(error) | ||
); | ||
if (operation == null) throw new MongoRuntimeError(`Unknown batchType: ${batch.batchType}`); | ||
let thrownError = null; | ||
let result; | ||
try { | ||
result = await executeOperation(bulkOperation.s.collection.client, operation); | ||
} catch (error) { | ||
thrownError = error; | ||
} | ||
} catch (err) { | ||
// Force top level error | ||
err.ok = 0; | ||
// Merge top level error and return | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined); | ||
callback(); | ||
if (thrownError != null) { | ||
if (thrownError instanceof MongoWriteConcernError) { | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); | ||
const writeResult = new BulkWriteResult( | ||
bulkOperation.s.bulkResult, | ||
bulkOperation.isOrdered | ||
); | ||
throw new MongoBulkWriteError( | ||
{ | ||
message: thrownError.result.writeConcernError.errmsg, | ||
code: thrownError.result.writeConcernError.code | ||
}, | ||
writeResult | ||
); | ||
} else { | ||
// Error is a driver related error not a bulk op error, return early | ||
throw new MongoBulkWriteError( | ||
thrownError, | ||
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) | ||
); | ||
} | ||
} | ||
mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); | ||
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
bulkOperation.handleWriteError(writeResult); | ||
} | ||
} | ||
function handleMongoWriteConcernError( | ||
batch: Batch, | ||
bulkResult: BulkResult, | ||
isOrdered: boolean, | ||
err: MongoWriteConcernError, | ||
callback: Callback<BulkWriteResult> | ||
) { | ||
mergeBatchResults(batch, bulkResult, undefined, err.result); | ||
bulkOperation.s.batches.length = 0; | ||
callback( | ||
new MongoBulkWriteError( | ||
{ | ||
message: err.result.writeConcernError.errmsg, | ||
code: err.result.writeConcernError.code | ||
}, | ||
new BulkWriteResult(bulkResult, isOrdered) | ||
) | ||
); | ||
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); | ||
bulkOperation.handleWriteError(writeResult); | ||
return writeResult; | ||
} | ||
@@ -882,4 +851,2 @@ | ||
const executeCommandsAsync = promisify(executeCommands); | ||
/** | ||
@@ -903,3 +870,3 @@ * TODO(NODE-4063) | ||
execute(_server: Server, session: ClientSession | undefined): Promise<any> { | ||
async execute(_server: Server, session: ClientSession | undefined): Promise<any> { | ||
if (this.options.session == null) { | ||
@@ -912,3 +879,3 @@ // An implicit session could have been created by 'executeOperation' | ||
} | ||
return executeCommandsAsync(this.bulkOperation, this.options); | ||
return await executeCommands(this.bulkOperation, this.options); | ||
} | ||
@@ -1249,3 +1216,3 @@ } | ||
*/ | ||
handleWriteError(callback: Callback<BulkWriteResult>, writeResult: BulkWriteResult): boolean { | ||
handleWriteError(writeResult: BulkWriteResult): void { | ||
if (this.s.bulkResult.writeErrors.length > 0) { | ||
@@ -1256,14 +1223,10 @@ const msg = this.s.bulkResult.writeErrors[0].errmsg | ||
callback( | ||
new MongoBulkWriteError( | ||
{ | ||
message: msg, | ||
code: this.s.bulkResult.writeErrors[0].code, | ||
writeErrors: this.s.bulkResult.writeErrors | ||
}, | ||
writeResult | ||
) | ||
throw new MongoBulkWriteError( | ||
{ | ||
message: msg, | ||
code: this.s.bulkResult.writeErrors[0].code, | ||
writeErrors: this.s.bulkResult.writeErrors | ||
}, | ||
writeResult | ||
); | ||
return true; | ||
} | ||
@@ -1273,7 +1236,4 @@ | ||
if (writeConcernError) { | ||
callback(new MongoBulkWriteError(writeConcernError, writeResult)); | ||
return true; | ||
throw new MongoBulkWriteError(writeConcernError, writeResult); | ||
} | ||
return false; | ||
} | ||
@@ -1280,0 +1240,0 @@ |
@@ -7,3 +7,2 @@ import type { Document } from '../bson'; | ||
import type { UpdateStatement } from '../operations/update'; | ||
import { type Callback } from '../utils'; | ||
import { | ||
@@ -24,8 +23,8 @@ Batch, | ||
override handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean { | ||
override handleWriteError(writeResult: BulkWriteResult): void { | ||
if (this.s.batches.length) { | ||
return false; | ||
return; | ||
} | ||
return super.handleWriteError(callback, writeResult); | ||
return super.handleWriteError(writeResult); | ||
} | ||
@@ -32,0 +31,0 @@ |
@@ -949,3 +949,3 @@ import type { Readable } from 'stream'; | ||
if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) { | ||
if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) { | ||
this._endStream(); | ||
@@ -979,3 +979,6 @@ | ||
if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) { | ||
if ( | ||
this.cursor.id == null || | ||
!isResumableError(changeStreamError, this.cursor.maxWireVersion) | ||
) { | ||
try { | ||
@@ -982,0 +985,0 @@ await this.close(); |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
22
28
6
168
3484117
72832