mongodb-core
Advanced tools
Comparing version 3.1.0-beta4 to 3.1.0
@@ -0,1 +1,44 @@ | ||
<a name="3.1.0"></a> | ||
# [3.1.0](https://github.com/mongodb-js/mongodb-core/compare/v3.0.6...v3.1.0) (2018-06-27) | ||
### Bug Fixes | ||
* **auth:** prevent stalling on authentication when connected ([6b4ac89](https://github.com/mongodb-js/mongodb-core/commit/6b4ac89)) | ||
* **cursor:** check for session presence independently ([7c76c62](https://github.com/mongodb-js/mongodb-core/commit/7c76c62)) | ||
* **cursor:** check for sessions independently in core cursor ([cb5df28](https://github.com/mongodb-js/mongodb-core/commit/cb5df28)) | ||
* **error:** attach command response to MongoWriteConcernError ([#322](https://github.com/mongodb-js/mongodb-core/issues/322)) ([24c5d06](https://github.com/mongodb-js/mongodb-core/commit/24c5d06)) | ||
* **getmore-killcursor:** slaveOk shall not be included on these ([40fb2f4](https://github.com/mongodb-js/mongodb-core/commit/40fb2f4)) | ||
* **kerberos:** loosen restrictions on kerberos versions ([c4add26](https://github.com/mongodb-js/mongodb-core/commit/c4add26)) | ||
* **mongos:** use `incrementTransactionNumber` directly on session ([e230d54](https://github.com/mongodb-js/mongodb-core/commit/e230d54)) | ||
* **pool:** ensure that lsid is sent in get requests to mongos ([ae820f6](https://github.com/mongodb-js/mongodb-core/commit/ae820f6)) | ||
* **sdam:** we can't use Array.includes yet ([9c3b5ab](https://github.com/mongodb-js/mongodb-core/commit/9c3b5ab)) | ||
* **sessions:** add `toBSON` method to `ClientSession` ([d95a4d1](https://github.com/mongodb-js/mongodb-core/commit/d95a4d1)) | ||
* **sessions:** never send `endSessions` from a `ClientSession` ([05ffe82](https://github.com/mongodb-js/mongodb-core/commit/05ffe82)) | ||
* **topology-description:** we can't use Object.values yet ([91df350](https://github.com/mongodb-js/mongodb-core/commit/91df350)) | ||
* **transactions:** do not send txnNumber for non-write commands ([#308](https://github.com/mongodb-js/mongodb-core/issues/308)) ([eb67b1a](https://github.com/mongodb-js/mongodb-core/commit/eb67b1a)) | ||
* **wireprotocol:** only send bypassDocumentValidation if true ([a81678b](https://github.com/mongodb-js/mongodb-core/commit/a81678b)) | ||
### Features | ||
* **auth:** adds saslprep and SCRAM-SHA-256 ([506c087](https://github.com/mongodb-js/mongodb-core/commit/506c087)) | ||
* **cursor:** implement cursor for new sdam implementation ([f289226](https://github.com/mongodb-js/mongodb-core/commit/f289226)) | ||
* **cursor:** store operation time from initial query ([55e761e](https://github.com/mongodb-js/mongodb-core/commit/55e761e)) | ||
* **error:** add more specific error type for write concern errors ([347c5d7](https://github.com/mongodb-js/mongodb-core/commit/347c5d7)) | ||
* **Error:** adding error metadata field ([33be560](https://github.com/mongodb-js/mongodb-core/commit/33be560)) | ||
* **kerberos:** expose warning for kerberos mismatch versions ([efc0e43](https://github.com/mongodb-js/mongodb-core/commit/efc0e43)) | ||
* **max-staleness:** properly support a max staleness reducer ([d9c5c16](https://github.com/mongodb-js/mongodb-core/commit/d9c5c16)) | ||
* **MongoTimeoutError:** add common class for timeout events ([c5b4752](https://github.com/mongodb-js/mongodb-core/commit/c5b4752)) | ||
* **op-compressed:** add support for OP_COMPRESSED to new sdam impl ([8deec9b](https://github.com/mongodb-js/mongodb-core/commit/8deec9b)) | ||
* **retryableWrites:** adding more support for retries ([d4c1597](https://github.com/mongodb-js/mongodb-core/commit/d4c1597)) | ||
* **sdam-monitoring:** add basic monitoring for new Topology type ([bb0c522](https://github.com/mongodb-js/mongodb-core/commit/bb0c522)) | ||
* **server:** add `command` support to new server class ([d9a8c05](https://github.com/mongodb-js/mongodb-core/commit/d9a8c05)) | ||
* **server-selection:** add basic support for server selection ([ccc5e1d](https://github.com/mongodb-js/mongodb-core/commit/ccc5e1d)) | ||
* **topology:** introduce a single Topology type, and test runner ([f35d773](https://github.com/mongodb-js/mongodb-core/commit/f35d773)) | ||
* **topology-description:** add helper method for server ownership ([2c64c75](https://github.com/mongodb-js/mongodb-core/commit/2c64c75)) | ||
* **txns:** add initial transaction interface for sessions ([ed76be0](https://github.com/mongodb-js/mongodb-core/commit/ed76be0)) | ||
<a name="3.0.6"></a> | ||
@@ -2,0 +45,0 @@ ## [3.0.6](https://github.com/mongodb-js/mongodb-core/compare/v3.0.5...v3.0.6) (2018-04-09) |
@@ -17,4 +17,10 @@ 'use strict'; | ||
module.exports = { | ||
// Errors | ||
MongoError: require('./lib/error').MongoError, | ||
MongoNetworkError: require('./lib/error').MongoNetworkError, | ||
MongoParseError: require('./lib/error').MongoParseError, | ||
MongoTimeoutError: require('./lib/error').MongoTimeoutError, | ||
MongoWriteConcernError: require('./lib/error').MongoWriteConcernError, | ||
mongoErrorContextSymbol: require('./lib/error').mongoErrorContextSymbol, | ||
// Core | ||
Connection: require('./lib/connection/connection'), | ||
@@ -21,0 +27,0 @@ Server: require('./lib/topologies/server'), |
'use strict'; | ||
var f = require('util').format, | ||
require_optional = require('require_optional'), | ||
Query = require('../connection/commands').Query, | ||
MongoError = require('../error').MongoError; | ||
const f = require('util').format; | ||
const Kerberos = require('../utils').Kerberos; | ||
const MongoAuthProcess = require('../utils').MongoAuthProcess; | ||
const Query = require('../connection/commands').Query; | ||
const MongoError = require('../error').MongoError; | ||
@@ -23,13 +24,2 @@ var AuthSession = function(db, username, password, options) { | ||
// Kerberos class | ||
var Kerberos = null; | ||
var MongoAuthProcess = null; | ||
// Try to grab the Kerberos class | ||
try { | ||
Kerberos = require_optional('kerberos').Kerberos; | ||
// Authentication process for Mongo | ||
MongoAuthProcess = require_optional('kerberos').processes.MongoAuthProcess; | ||
} catch (err) {} // eslint-disable-line | ||
/** | ||
@@ -36,0 +26,0 @@ * Creates a new GSSAPI authentication mechanism |
'use strict'; | ||
var f = require('util').format, | ||
require_optional = require('require_optional'), | ||
Query = require('../connection/commands').Query, | ||
MongoError = require('../error').MongoError; | ||
const f = require('util').format; | ||
const Kerberos = require('../utils').Kerberos; | ||
const MongoAuthProcess = require('../utils').MongoAuthProcess; | ||
const Query = require('../connection/commands').Query; | ||
const MongoError = require('../error').MongoError; | ||
@@ -23,13 +24,2 @@ var AuthSession = function(db, username, password, options) { | ||
// Kerberos class | ||
var Kerberos = null; | ||
var MongoAuthProcess = null; | ||
// Try to grab the Kerberos class | ||
try { | ||
Kerberos = require_optional('kerberos').Kerberos; | ||
// Authentication process for Mongo | ||
MongoAuthProcess = require_optional('kerberos').processes.MongoAuthProcess; | ||
} catch (err) {} // eslint-disable-line | ||
/** | ||
@@ -36,0 +26,0 @@ * Creates a new SSPI authentication mechanism |
'use strict'; | ||
const KillCursor = require('../connection/commands').KillCursor; | ||
const GetMore = require('../connection/commands').GetMore; | ||
const process = require('process'); | ||
const calculateDurationInMs = require('../utils').calculateDurationInMs; | ||
@@ -21,7 +21,2 @@ /** Commands that we want to redact because of the sensitive nature of their contents */ | ||
const extractCommandName = command => Object.keys(command)[0]; | ||
const calculateDurationInMs = started => { | ||
const hrtime = process.hrtime(started); | ||
return (hrtime[0] * 1e9 + hrtime[1]) / 1e6; | ||
}; | ||
const namespace = command => command.ns; | ||
@@ -28,0 +23,0 @@ const databaseName = command => command.ns.split('.')[0]; |
'use strict'; | ||
var inherits = require('util').inherits, | ||
EventEmitter = require('events').EventEmitter, | ||
Connection = require('./connection'), | ||
MongoError = require('../error').MongoError, | ||
MongoNetworkError = require('../error').MongoNetworkError, | ||
Logger = require('./logger'), | ||
f = require('util').format, | ||
Query = require('./commands').Query, | ||
CommandResult = require('./command_result'), | ||
MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE, | ||
opcodes = require('../wireprotocol/shared').opcodes, | ||
compress = require('../wireprotocol/compression').compress, | ||
compressorIDs = require('../wireprotocol/compression').compressorIDs, | ||
uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands, | ||
resolveClusterTime = require('../topologies/shared').resolveClusterTime; | ||
const inherits = require('util').inherits; | ||
const EventEmitter = require('events').EventEmitter; | ||
const Connection = require('./connection'); | ||
const MongoError = require('../error').MongoError; | ||
const MongoNetworkError = require('../error').MongoNetworkError; | ||
const MongoWriteConcernError = require('../error').MongoWriteConcernError; | ||
const Logger = require('./logger'); | ||
const f = require('util').format; | ||
const Query = require('./commands').Query; | ||
const CommandResult = require('./command_result'); | ||
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE; | ||
const opcodes = require('../wireprotocol/shared').opcodes; | ||
const compress = require('../wireprotocol/compression').compress; | ||
const compressorIDs = require('../wireprotocol/compression').compressorIDs; | ||
const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands; | ||
const resolveClusterTime = require('../topologies/shared').resolveClusterTime; | ||
const apm = require('./apm'); | ||
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders; | ||
@@ -578,11 +577,15 @@ | ||
// Establish if we have an error | ||
if ( | ||
workItem.command && | ||
message.documents[0] && | ||
(message.documents[0].ok === 0 || | ||
message.documents[0]['$err'] || | ||
message.documents[0]['errmsg'] || | ||
message.documents[0]['code']) | ||
) { | ||
return handleOperationCallback(self, workItem.cb, new MongoError(message.documents[0])); | ||
if (workItem.command && message.documents[0]) { | ||
const responseDoc = message.documents[0]; | ||
if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) { | ||
return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc)); | ||
} | ||
if (responseDoc.writeConcernError) { | ||
const err = | ||
responseDoc.ok === 1 | ||
? new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc) | ||
: new MongoWriteConcernError(responseDoc.writeConcernError); | ||
return handleOperationCallback(self, workItem.cb, err); | ||
} | ||
} | ||
@@ -589,0 +592,0 @@ |
'use strict'; | ||
var Logger = require('./connection/logger'), | ||
retrieveBSON = require('./connection/utils').retrieveBSON, | ||
MongoError = require('./error').MongoError, | ||
MongoNetworkError = require('./error').MongoNetworkError, | ||
f = require('util').format; | ||
const Logger = require('./connection/logger'); | ||
const retrieveBSON = require('./connection/utils').retrieveBSON; | ||
const MongoError = require('./error').MongoError; | ||
const MongoNetworkError = require('./error').MongoNetworkError; | ||
const mongoErrorContextSymbol = require('./error').mongoErrorContextSymbol; | ||
const f = require('util').format; | ||
@@ -109,2 +110,6 @@ var BSON = retrieveBSON(), | ||
if (typeof options.session === 'object') { | ||
this.cursorState.session = options.session; | ||
} | ||
// Add promoteLong to cursor state | ||
@@ -115,4 +120,2 @@ if (typeof topologyOptions.promoteLongs === 'boolean') { | ||
this.cursorState.promoteLongs = options.promoteLongs; | ||
} else if (typeof options.session === 'object') { | ||
this.cursorState.session = options.session; | ||
} | ||
@@ -259,2 +262,3 @@ | ||
self.cursorState.lastCursorId = self.cursorState.cursorId; | ||
self.cursorState.operationTime = result.documents[0].operationTime; | ||
// If we have a firstBatch set it | ||
@@ -650,2 +654,6 @@ if (Array.isArray(result.documents[0].cursor.firstBatch)) { | ||
); | ||
if (self.query instanceof MongoError) { | ||
return callback(self.query); | ||
} | ||
} catch (err) { | ||
@@ -716,4 +724,10 @@ return callback(err); | ||
self._getmore(function(err, doc, connection) { | ||
if (err) return handleCallback(callback, err); | ||
if (err) { | ||
if (err instanceof MongoError) { | ||
err[mongoErrorContextSymbol].isGetMore = true; | ||
} | ||
return handleCallback(callback, err); | ||
} | ||
if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) { | ||
@@ -720,0 +734,0 @@ self._endSession(); |
@@ -5,2 +5,4 @@ 'use strict'; | ||
const mongoErrorContextSymbol = Symbol('mongoErrorContextSymbol'); | ||
/** | ||
@@ -35,2 +37,4 @@ * Creates a new MongoError | ||
} | ||
this[mongoErrorContextSymbol] = this[mongoErrorContextSymbol] || {}; | ||
} | ||
@@ -62,2 +66,5 @@ util.inherits(MongoError, Error); | ||
this.name = 'MongoNetworkError'; | ||
// This is added as part of the transactions specification | ||
this.errorLabels = ['TransientTransactionError']; | ||
}; | ||
@@ -72,3 +79,3 @@ util.inherits(MongoNetworkError, MongoError); | ||
* @property {string} message The error message | ||
* @return {MongoParseError} A MongoNetworkError instance | ||
* @return {MongoParseError} A MongoParseError instance | ||
* @extends {MongoError} | ||
@@ -82,6 +89,74 @@ */ | ||
/** | ||
* An error signifying a timeout event | ||
* | ||
* @class | ||
* @param {Error|string|object} message The error message | ||
* @property {string} message The error message | ||
* @return {MongoTimeoutError} A MongoTimeoutError instance | ||
* @extends {MongoError} | ||
*/ | ||
const MongoTimeoutError = function(message) { | ||
MongoError.call(this, message); | ||
this.name = 'MongoTimeoutError'; | ||
}; | ||
util.inherits(MongoTimeoutError, MongoError); | ||
// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms | ||
const RETRYABLE_ERROR_CODES = new Set([ | ||
6, // HostUnreachable | ||
7, // HostNotFound | ||
89, // NetworkTimeout | ||
91, // ShutdownInProgress | ||
189, // PrimarySteppedDown | ||
9001, // SocketException | ||
10107, // NotMaster | ||
11600, // InterruptedAtShutdown | ||
11602, // InterruptedDueToReplStateChange | ||
13435, // NotMasterNoSlaveOk | ||
13436 // NotMasterOrSecondary | ||
]); | ||
function isRetryableError(error) { | ||
if ( | ||
RETRYABLE_ERROR_CODES.has(error.code) || | ||
error instanceof MongoNetworkError || | ||
error.message.match(/not master/) || | ||
error.message.match(/node is recovering/) | ||
) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
/** | ||
* An error thrown when the server reports a writeConcernError | ||
* | ||
* @class | ||
* @param {Error|string|object} message The error message | ||
* @param {object} result The result document (provided if ok: 1) | ||
* @property {string} message The error message | ||
* @property {object} [result] The result document (provided if ok: 1) | ||
* @return {MongoWriteConcernError} A MongoWriteConcernError instance | ||
* @extends {MongoError} | ||
*/ | ||
const MongoWriteConcernError = function(message, result) { | ||
MongoError.call(this, message); | ||
this.name = 'MongoWriteConcernError'; | ||
if (result != null) { | ||
this.result = result; | ||
} | ||
}; | ||
util.inherits(MongoWriteConcernError, MongoError); | ||
module.exports = { | ||
MongoError: MongoError, | ||
MongoNetworkError: MongoNetworkError, | ||
MongoParseError: MongoParseError | ||
MongoError, | ||
MongoNetworkError, | ||
MongoParseError, | ||
MongoTimeoutError, | ||
MongoWriteConcernError, | ||
mongoErrorContextSymbol, | ||
isRetryableError | ||
}; |
@@ -9,3 +9,7 @@ 'use strict'; | ||
const MongoError = require('./error').MongoError; | ||
const isRetryableError = require('././error').isRetryableError; | ||
const MongoNetworkError = require('./error').MongoNetworkError; | ||
const MongoWriteConcernError = require('./error').MongoWriteConcernError; | ||
const Transaction = require('./transactions').Transaction; | ||
const TxnState = require('./transactions').TxnState; | ||
@@ -73,8 +77,14 @@ function assertAlive(session, callback) { | ||
this.owner = options.owner; | ||
this.transactionOptions = null; | ||
this.autoStartTransaction = options.autoStartTransaction; | ||
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions); | ||
this.transaction = new Transaction(); | ||
} | ||
/** | ||
* Return the server id associated with this session | ||
*/ | ||
get id() { | ||
return this.serverSession.id; | ||
} | ||
/** | ||
* Ends this session on the server | ||
@@ -144,16 +154,6 @@ * | ||
/** | ||
* Increment the statement id on the internal ServerSession | ||
* | ||
* @param {Number} [operationCount] the number of operations performed | ||
*/ | ||
incrementStatementId(operationCount) { | ||
operationCount = operationCount || 1; | ||
this.serverSession.stmtId += operationCount; | ||
} | ||
/** | ||
* @returns whether this session is current in a transaction or not | ||
*/ | ||
inTransaction() { | ||
return this.transactionOptions != null; | ||
return this.transaction.isActive; | ||
} | ||
@@ -174,8 +174,11 @@ | ||
// increment txnNumber and reset stmtId to zero. | ||
this.serverSession.txnNumber += 1; | ||
this.serverSession.stmtId = 0; | ||
// increment txnNumber | ||
this.incrementTransactionNumber(); | ||
// set transaction options, we will use this to determine if we are in a transaction | ||
this.transactionOptions = Object.assign({}, options || this.defaultTransactionOptions); | ||
// create transaction state | ||
this.transaction = new Transaction( | ||
Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions) | ||
); | ||
this.transaction.transition(TxnState.STARTING_TRANSACTION); | ||
} | ||
@@ -224,39 +227,10 @@ | ||
} | ||
} | ||
// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms | ||
const RETRYABLE_ERROR_CODES = new Set([ | ||
6, // HostUnreachable | ||
7, // HostNotFound | ||
64, // WriteConcernFailed | ||
89, // NetworkTimeout | ||
91, // ShutdownInProgress | ||
189, // PrimarySteppedDown | ||
9001, // SocketException | ||
11600, // InterruptedAtShutdown | ||
11602, // InterruptedDueToReplStateChange | ||
10107, // NotMaster | ||
13435, // NotMasterNoSlaveOk | ||
13436 // NotMasterOrSecondary | ||
]); | ||
function isRetryableError(error) { | ||
if ( | ||
RETRYABLE_ERROR_CODES.has(error.code) || | ||
error instanceof MongoNetworkError || | ||
error.message.match(/not master/) || | ||
error.message.match(/node is recovering/) | ||
) { | ||
return true; | ||
toBSON() { | ||
throw new Error('ClientSession cannot be serialized to BSON.'); | ||
} | ||
return false; | ||
} | ||
function resetTransactionState(clientSession) { | ||
clientSession.transactionOptions = null; | ||
} | ||
function endTransaction(clientSession, commandName, callback) { | ||
if (!assertAlive(clientSession, callback)) { | ||
function endTransaction(session, commandName, callback) { | ||
if (!assertAlive(session, callback)) { | ||
// checking result in case callback was called | ||
@@ -266,30 +240,86 @@ return; | ||
if (!clientSession.inTransaction()) { | ||
if (clientSession.autoStartTransaction) { | ||
clientSession.startTransaction(); | ||
} else { | ||
callback(new MongoError('No transaction started')); | ||
// handle any initial problematic cases | ||
let txnState = session.transaction.state; | ||
if (txnState === TxnState.NO_TRANSACTION) { | ||
callback(new MongoError('No transaction started')); | ||
return; | ||
} | ||
if (commandName === 'commitTransaction') { | ||
if ( | ||
txnState === TxnState.STARTING_TRANSACTION || | ||
txnState === TxnState.TRANSACTION_COMMITTED_EMPTY | ||
) { | ||
// the transaction was never started, we can safely exit here | ||
session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY); | ||
callback(null, null); | ||
return; | ||
} | ||
} | ||
if (clientSession.serverSession.stmtId === 0) { | ||
// The server transaction was never started. | ||
resetTransactionState(clientSession); | ||
callback(null, null); | ||
return; | ||
if (txnState === TxnState.TRANSACTION_ABORTED) { | ||
callback(new MongoError('Cannot call commitTransaction after calling abortTransaction')); | ||
return; | ||
} | ||
} else { | ||
if (txnState === TxnState.STARTING_TRANSACTION) { | ||
// the transaction was never started, we can safely exit here | ||
session.transaction.transition(TxnState.TRANSACTION_ABORTED); | ||
callback(null, null); | ||
return; | ||
} | ||
if (txnState === TxnState.TRANSACTION_ABORTED) { | ||
callback(new MongoError('Cannot call abortTransaction twice')); | ||
return; | ||
} | ||
if ( | ||
txnState === TxnState.TRANSACTION_COMMITTED || | ||
txnState === TxnState.TRANSACTION_COMMITTED_EMPTY | ||
) { | ||
callback(new MongoError('Cannot call abortTransaction after calling commitTransaction')); | ||
return; | ||
} | ||
} | ||
// construct and send the command | ||
const command = { [commandName]: 1 }; | ||
if (clientSession.transactionOptions.writeConcern) { | ||
Object.assign(command, { writeConcern: clientSession.transactionOptions.writeConcern }); | ||
} else if (clientSession.clientOptions && clientSession.clientOptions.w) { | ||
Object.assign(command, { writeConcern: { w: clientSession.clientOptions.w } }); | ||
// apply a writeConcern if specified | ||
if (session.transaction.options.writeConcern) { | ||
Object.assign(command, { writeConcern: session.transaction.options.writeConcern }); | ||
} else if (session.clientOptions && session.clientOptions.w) { | ||
Object.assign(command, { writeConcern: { w: session.clientOptions.w } }); | ||
} | ||
function commandHandler(e, r) { | ||
resetTransactionState(clientSession); | ||
if (commandName === 'commitTransaction') { | ||
session.transaction.transition(TxnState.TRANSACTION_COMMITTED); | ||
if ( | ||
e && | ||
(e instanceof MongoNetworkError || | ||
e instanceof MongoWriteConcernError || | ||
isRetryableError(e)) | ||
) { | ||
if (e.errorLabels) { | ||
const idx = e.errorLabels.indexOf('TransientTransactionError'); | ||
if (idx !== -1) { | ||
e.errorLabels.splice(idx, 1); | ||
} | ||
} else { | ||
e.errorLabels = []; | ||
} | ||
e.errorLabels.push('UnknownTransactionCommitResult'); | ||
} | ||
} else { | ||
session.transaction.transition(TxnState.TRANSACTION_ABORTED); | ||
} | ||
callback(e, r); | ||
} | ||
// The spec indicates that we should ignore all errors on `abortTransaction` | ||
function transactionError(err) { | ||
@@ -300,27 +330,13 @@ return commandName === 'commitTransaction' ? err : null; | ||
// send the command | ||
clientSession.topology.command( | ||
'admin.$cmd', | ||
command, | ||
{ session: clientSession }, | ||
(err, reply) => { | ||
if (err && isRetryableError(err)) { | ||
return clientSession.topology.command( | ||
'admin.$cmd', | ||
command, | ||
{ session: clientSession }, | ||
(_err, _reply) => commandHandler(transactionError(_err), _reply) | ||
); | ||
} | ||
session.topology.command('admin.$cmd', command, { session }, (err, reply) => { | ||
if (err && isRetryableError(err)) { | ||
return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) => | ||
commandHandler(transactionError(_err), _reply) | ||
); | ||
} | ||
commandHandler(transactionError(err), reply); | ||
} | ||
); | ||
commandHandler(transactionError(err), reply); | ||
}); | ||
} | ||
Object.defineProperty(ClientSession.prototype, 'id', { | ||
get: function() { | ||
return this.serverSession.id; | ||
} | ||
}); | ||
/** | ||
@@ -408,5 +424,6 @@ * | ||
module.exports = { | ||
ClientSession: ClientSession, | ||
ServerSession: ServerSession, | ||
ServerSessionPool: ServerSessionPool | ||
ClientSession, | ||
ServerSession, | ||
ServerSessionPool, | ||
TxnState | ||
}; |
@@ -10,3 +10,2 @@ 'use strict'; | ||
const MongoError = require('../error').MongoError; | ||
const errors = require('../error'); | ||
const Server = require('./server'); | ||
@@ -19,3 +18,4 @@ const clone = require('./shared').clone; | ||
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; | ||
const relayEvents = require('./shared').relayEvents; | ||
const relayEvents = require('../utils').relayEvents; | ||
const isRetryableError = require('../error').isRetryableError; | ||
const BSON = retrieveBSON(); | ||
@@ -905,3 +905,3 @@ | ||
if (!err) return callback(null, result); | ||
if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) { | ||
if (!isRetryableError(err)) { | ||
return callback(err); | ||
@@ -1025,2 +1025,8 @@ } | ||
const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete']; | ||
function isWriteCommand(command) { | ||
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]); | ||
} | ||
/** | ||
@@ -1064,4 +1070,32 @@ * Execute a command | ||
const willRetryWrite = | ||
!options.retrying && | ||
options.retryWrites && | ||
options.session && | ||
isRetryableWritesSupported(self) && | ||
!options.session.inTransaction() && | ||
isWriteCommand(cmd); | ||
const cb = (err, result) => { | ||
if (!err) return callback(null, result); | ||
if (!isRetryableError(err)) { | ||
return callback(err); | ||
} | ||
if (willRetryWrite) { | ||
const newOptions = Object.assign({}, clonedOptions, { retrying: true }); | ||
return this.command(ns, cmd, newOptions, callback); | ||
} | ||
return callback(err); | ||
}; | ||
// increment and assign txnNumber | ||
if (willRetryWrite) { | ||
options.session.incrementTransactionNumber(); | ||
options.willRetryWrite = willRetryWrite; | ||
} | ||
// Execute the command | ||
server.command(ns, cmd, clonedOptions, callback); | ||
server.command(ns, cmd, clonedOptions, cb); | ||
}; | ||
@@ -1068,0 +1102,0 @@ |
@@ -44,14 +44,42 @@ 'use strict'; | ||
const ReadPreference = function(mode, tags, options) { | ||
// TODO(major): tags MUST be an array of tagsets | ||
if (tags && !Array.isArray(tags)) { | ||
console.warn( | ||
'ReadPreference tags must be an array, this will change in the next major version' | ||
); | ||
if (typeof tags.maxStalenessSeconds !== 'undefined') { | ||
// this is likely an options object | ||
options = tags; | ||
tags = undefined; | ||
} else { | ||
tags = [tags]; | ||
} | ||
} | ||
this.mode = mode; | ||
this.tags = tags; | ||
this.options = options; | ||
// Add the maxStalenessSeconds value to the read Preference | ||
if (this.options && this.options.maxStalenessSeconds != null) { | ||
this.options = options; | ||
this.maxStalenessSeconds = | ||
this.options.maxStalenessSeconds >= 0 ? this.options.maxStalenessSeconds : null; | ||
} else if (tags && typeof tags === 'object') { | ||
(this.options = tags), (tags = null); | ||
options = options || {}; | ||
if (options.maxStalenessSeconds != null) { | ||
if (options.maxStalenessSeconds <= 0) { | ||
throw new TypeError('maxStalenessSeconds must be a positive integer'); | ||
} | ||
this.maxStalenessSeconds = options.maxStalenessSeconds; | ||
// NOTE: The minimum required wire version is 5 for this read preference. If the existing | ||
// topology has a lower value then a MongoError will be thrown during server selection. | ||
this.minWireVersion = 5; | ||
} | ||
if (this.mode === ReadPreference.PRIMARY || this.mode === true) { | ||
if (this.tags && Array.isArray(this.tags) && this.tags.length > 0) { | ||
throw new TypeError('Primary read preference cannot be combined with tags'); | ||
} | ||
if (this.maxStalenessSeconds) { | ||
throw new TypeError('Primary read preference cannot be combined with maxStalenessSeconds'); | ||
} | ||
} | ||
}; | ||
@@ -58,0 +86,0 @@ |
@@ -11,3 +11,2 @@ 'use strict'; | ||
const MongoError = require('../error').MongoError; | ||
const errors = require('../error'); | ||
const Server = require('./server'); | ||
@@ -21,3 +20,4 @@ const ReplSetState = require('./replset_state'); | ||
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; | ||
const relayEvents = require('./shared').relayEvents; | ||
const relayEvents = require('../utils').relayEvents; | ||
const isRetryableError = require('../error').isRetryableError; | ||
@@ -1027,3 +1027,2 @@ const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders; | ||
this.intervalIds[i].stop(); | ||
this.intervalIds[i].stop(); | ||
} | ||
@@ -1171,9 +1170,2 @@ | ||
function ensureTransactionAutostart(session) { | ||
if (!session) return; | ||
if (!session.inTransaction() && session.autoStartTransaction) { | ||
session.startTransaction(); | ||
} | ||
} | ||
// | ||
@@ -1197,3 +1189,3 @@ // Execute write operation | ||
!args.retrying && | ||
options.retryWrites && | ||
!!options.retryWrites && | ||
options.session && | ||
@@ -1215,3 +1207,3 @@ isRetryableWritesSupported(self) && | ||
if (!err) return callback(null, result); | ||
if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) { | ||
if (!isRetryableError(err)) { | ||
return callback(err); | ||
@@ -1243,11 +1235,3 @@ } | ||
// optionally autostart transaction if requested | ||
ensureTransactionAutostart(options.session); | ||
self.s.replicaSetState.primary[op](ns, ops, options, handler); | ||
// We need to increment the statement id if we're in a transaction | ||
if (options.session && options.session.inTransaction()) { | ||
options.session.incrementStatementId(ops.length); | ||
} | ||
} | ||
@@ -1309,2 +1293,8 @@ | ||
const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete']; | ||
function isWriteCommand(command) { | ||
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]); | ||
} | ||
/** | ||
@@ -1333,13 +1323,2 @@ * Execute a command | ||
// optionally autostart transaction if requested | ||
ensureTransactionAutostart(options.session); | ||
if ( | ||
options.session && | ||
options.session.inTransaction() && | ||
!readPreference.equals(ReadPreference.primary) | ||
) { | ||
return callback(new MongoError('Read preference in a transaction must be primary')); | ||
} | ||
// If the readPreference is primary and we have no primary, store it | ||
@@ -1383,4 +1362,37 @@ if ( | ||
const willRetryWrite = | ||
!options.retrying && | ||
!!options.retryWrites && | ||
options.session && | ||
isRetryableWritesSupported(self) && | ||
!options.session.inTransaction() && | ||
isWriteCommand(cmd); | ||
const cb = (err, result) => { | ||
if (!err) return callback(null, result); | ||
if (!isRetryableError(err)) { | ||
return callback(err); | ||
} | ||
if (willRetryWrite) { | ||
const newOptions = Object.assign({}, options, { retrying: true }); | ||
return this.command(ns, cmd, newOptions, callback); | ||
} | ||
// Per SDAM, remove primary from replicaset | ||
if (this.s.replicaSetState.primary) { | ||
this.s.replicaSetState.remove(this.s.replicaSetState.primary, { force: true }); | ||
} | ||
return callback(err); | ||
}; | ||
// increment and assign txnNumber | ||
if (willRetryWrite) { | ||
options.session.incrementTransactionNumber(); | ||
options.willRetryWrite = willRetryWrite; | ||
} | ||
// Execute the command | ||
server.command(ns, cmd, options, callback); | ||
server.command(ns, cmd, options, cb); | ||
}; | ||
@@ -1387,0 +1399,0 @@ |
@@ -22,3 +22,3 @@ 'use strict'; | ||
SessionMixins = require('./shared').SessionMixins, | ||
relayEvents = require('./shared').relayEvents; | ||
relayEvents = require('../utils').relayEvents; | ||
@@ -312,3 +312,3 @@ function getSaslSupportedMechs(options) { | ||
if (ismaster.maxWireVersion >= 4) { | ||
return new ThreeTwoWireProtocolSupport(new TwoSixWireProtocolSupport()); | ||
return new ThreeTwoWireProtocolSupport(); | ||
} | ||
@@ -762,2 +762,6 @@ | ||
var query = self.wireProtocolHandler.command(self.s.bson, ns, cmd, {}, topology, options); | ||
if (query instanceof MongoError) { | ||
return callback(query, null); | ||
} | ||
// Set slave OK of the query | ||
@@ -812,11 +816,3 @@ query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false; | ||
// Execute write | ||
return self.wireProtocolHandler.insert( | ||
self.s.pool, | ||
self.ismaster, | ||
ns, | ||
self.s.bson, | ||
ops, | ||
options, | ||
callback | ||
); | ||
return self.wireProtocolHandler.insert(self.s.pool, ns, self.s.bson, ops, options, callback); | ||
}; | ||
@@ -856,11 +852,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.update( | ||
self.s.pool, | ||
self.ismaster, | ||
ns, | ||
self.s.bson, | ||
ops, | ||
options, | ||
callback | ||
); | ||
return self.wireProtocolHandler.update(self.s.pool, ns, self.s.bson, ops, options, callback); | ||
}; | ||
@@ -900,11 +888,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.remove( | ||
self.s.pool, | ||
self.ismaster, | ||
ns, | ||
self.s.bson, | ||
ops, | ||
options, | ||
callback | ||
); | ||
return self.wireProtocolHandler.remove(self.s.pool, ns, self.s.bson, ops, options, callback); | ||
}; | ||
@@ -911,0 +891,0 @@ |
@@ -420,12 +420,2 @@ 'use strict'; | ||
/** | ||
* Relays events for a given listener and emitter | ||
* | ||
* @param {EventEmitter} listener the EventEmitter to listen to the events for | ||
* @param {EventEmitter} emitter the EventEmitter to relay the events on | ||
*/ | ||
function relayEvents(listener, emitter, events) { | ||
events.forEach(eventName => listener.on(eventName, event => emitter.emit(eventName, event))); | ||
} | ||
module.exports.SessionMixins = SessionMixins; | ||
@@ -445,2 +435,1 @@ module.exports.resolveClusterTime = resolveClusterTime; | ||
module.exports.isRetryableWritesSupported = isRetryableWritesSupported; | ||
module.exports.relayEvents = relayEvents; |
@@ -112,3 +112,3 @@ 'use strict'; | ||
record = record.length > 1 ? record.join('') : record[0]; | ||
if (!record.includes('authSource') && !record.includes('replicaSet')) { | ||
if (record.indexOf('authSource') === -1 && record.indexOf('replicaSet') === -1) { | ||
return callback( | ||
@@ -115,0 +115,0 @@ new MongoParseError('Text record must only set `authSource` or `replicaSet`') |
'use strict'; | ||
const crypto = require('crypto'); | ||
const requireOptional = require('require_optional'); | ||
/** | ||
* Generate a UUIDv4 | ||
*/ | ||
const uuidV4 = () => { | ||
@@ -12,4 +16,43 @@ const result = crypto.randomBytes(16); | ||
/** | ||
* Returns the duration calculated from two high resolution timers in milliseconds | ||
* | ||
* @param {Object} started A high resolution timestamp created from `process.hrtime()` | ||
* @returns {Number} The duration in milliseconds | ||
*/ | ||
const calculateDurationInMs = started => { | ||
const hrtime = process.hrtime(started); | ||
return (hrtime[0] * 1e9 + hrtime[1]) / 1e6; | ||
}; | ||
/** | ||
* Relays events for a given listener and emitter | ||
* | ||
* @param {EventEmitter} listener the EventEmitter to listen to the events from | ||
* @param {EventEmitter} emitter the EventEmitter to relay the events to | ||
*/ | ||
function relayEvents(listener, emitter, events) { | ||
events.forEach(eventName => listener.on(eventName, event => emitter.emit(eventName, event))); | ||
} | ||
// Grab Kerberos values if they exist, otherwise set them to null | ||
let Kerberos = null; | ||
let MongoAuthProcess = null; | ||
try { | ||
const kerberos = requireOptional('kerberos'); | ||
if (kerberos) { | ||
Kerberos = kerberos.Kerberos; | ||
MongoAuthProcess = kerberos.processes.MongoAuthProcess; | ||
} | ||
} catch (err) { | ||
console.warn(err.message); | ||
} | ||
module.exports = { | ||
uuidV4: uuidV4 | ||
uuidV4, | ||
calculateDurationInMs, | ||
relayEvents, | ||
Kerberos, | ||
MongoAuthProcess | ||
}; |
@@ -46,3 +46,3 @@ 'use strict'; | ||
// Do we have bypassDocumentValidation set, then enable it on the write command | ||
if (typeof options.bypassDocumentValidation === 'boolean') { | ||
if (options.bypassDocumentValidation === true) { | ||
writeCommand.bypassDocumentValidation = options.bypassDocumentValidation; | ||
@@ -76,11 +76,11 @@ } | ||
// | ||
WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) { | ||
WireProtocol.prototype.insert = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback); | ||
}; | ||
WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) { | ||
WireProtocol.prototype.update = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback); | ||
}; | ||
WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) { | ||
WireProtocol.prototype.remove = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback); | ||
@@ -87,0 +87,0 @@ }; |
@@ -5,3 +5,2 @@ 'use strict'; | ||
const retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const f = require('util').format; | ||
const MongoError = require('../error').MongoError; | ||
@@ -12,15 +11,21 @@ const MongoNetworkError = require('../error').MongoNetworkError; | ||
const Long = BSON.Long; | ||
const ReadPreference = require('../topologies/read_preference'); | ||
const TxnState = require('../transactions').TxnState; | ||
var WireProtocol = function(legacyWireProtocol) { | ||
this.legacyWireProtocol = legacyWireProtocol; | ||
}; | ||
const WireProtocol = function() {}; | ||
function isTransactionCommand(command) { | ||
return !!(command.commitTransaction || command.abortTransaction); | ||
} | ||
/** | ||
* Optionally decorate a command with transactions specific keys | ||
* Optionally decorate a command with sessions specific keys | ||
* | ||
* @param {Object} command the command to decorate | ||
* @param {ClientSession} session the session tracking transaction state | ||
* @param {boolean} [isRetryableWrite=false] if true, will be decorated for retryable writes | ||
* @param {Object} [options] Optional settings passed to calling operation | ||
* @param {Function} [callback] Optional callback passed from calling operation | ||
* @return {MongoError|null} An error, if some error condition was met | ||
*/ | ||
function decorateWithTransactionsData(command, session, isRetryableWrite) { | ||
function decorateWithSessionsData(command, session, options) { | ||
if (!session) { | ||
@@ -32,3 +37,4 @@ return; | ||
const serverSession = session.serverSession; | ||
const inTransaction = session.inTransaction(); | ||
const inTransaction = session.inTransaction() || isTransactionCommand(command); | ||
const isRetryableWrite = options.willRetryWrite; | ||
@@ -39,14 +45,32 @@ if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) { | ||
// now try to apply tansaction-specific data | ||
// now attempt to apply transaction-specific sessions data | ||
if (!inTransaction) { | ||
if (session.transaction.state !== TxnState.NO_TRANSACTION) { | ||
session.transaction.transition(TxnState.NO_TRANSACTION); | ||
} | ||
// for causal consistency | ||
if (session.supports.causalConsistency && session.operationTime) { | ||
command.readConcern = command.readConcern || {}; | ||
Object.assign(command.readConcern, { afterClusterTime: session.operationTime }); | ||
} | ||
return; | ||
} | ||
command.stmtId = serverSession.stmtId; | ||
if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) { | ||
return new MongoError( | ||
`Read preference in a transaction must be primary, not: ${options.readPreference.mode}` | ||
); | ||
} | ||
// `autocommit` must always be false to differentiate from retryable writes | ||
command.autocommit = false; | ||
if (serverSession.stmtId === 0) { | ||
if (session.transaction.state === TxnState.STARTING_TRANSACTION) { | ||
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS); | ||
command.startTransaction = true; | ||
const readConcern = session.transactionOptions.readConcern || session.clientOptions.readConcern; | ||
const readConcern = | ||
session.transaction.options.readConcern || session.clientOptions.readConcern; | ||
if (readConcern) { | ||
@@ -60,9 +84,2 @@ command.readConcern = readConcern; | ||
} | ||
} else { | ||
// Drivers MUST add this readConcern to the first command in a transaction and MUST NOT | ||
// automatically add any readConcern to subsequent commands. Drivers MUST ignore all other | ||
// readConcerns. | ||
if (command.readConcern) { | ||
delete command.readConcern; | ||
} | ||
} | ||
@@ -73,3 +90,3 @@ } | ||
// Execute a write operation | ||
var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) { | ||
function executeWrite(pool, bson, type, opsField, ns, ops, options, callback) { | ||
if (ops.length === 0) throw new MongoError('insert must contain at least one document'); | ||
@@ -83,10 +100,10 @@ if (typeof options === 'function') { | ||
// Split the ns up to get db and collection | ||
var p = ns.split('.'); | ||
var d = p.shift(); | ||
const p = ns.split('.'); | ||
const d = p.shift(); | ||
// Options | ||
var ordered = typeof options.ordered === 'boolean' ? options.ordered : true; | ||
var writeConcern = options.writeConcern; | ||
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; | ||
const writeConcern = options.writeConcern; | ||
// return skeleton | ||
var writeCommand = {}; | ||
const writeCommand = {}; | ||
writeCommand[type] = p.join('.'); | ||
@@ -103,3 +120,3 @@ writeCommand[opsField] = ops; | ||
if (options.collation) { | ||
for (var i = 0; i < writeCommand[opsField].length; i++) { | ||
for (let i = 0; i < writeCommand[opsField].length; i++) { | ||
if (!writeCommand[opsField][i].collation) { | ||
@@ -112,3 +129,3 @@ writeCommand[opsField][i].collation = options.collation; | ||
// Do we have bypassDocumentValidation set, then enable it on the write command | ||
if (typeof options.bypassDocumentValidation === 'boolean') { | ||
if (options.bypassDocumentValidation === true) { | ||
writeCommand.bypassDocumentValidation = options.bypassDocumentValidation; | ||
@@ -118,8 +135,11 @@ } | ||
// optionally decorate command with transactions data | ||
decorateWithTransactionsData(writeCommand, options.session, options.willRetryWrite); | ||
const err = decorateWithSessionsData(writeCommand, options.session, options, callback); | ||
if (err) { | ||
return callback(err, null); | ||
} | ||
// Options object | ||
var opts = { command: true }; | ||
const opts = { command: true }; | ||
if (typeof options.session !== 'undefined') opts.session = options.session; | ||
var queryOptions = { checkKeys: false, numberToSkip: 0, numberToReturn: 1 }; | ||
const queryOptions = { checkKeys: false, numberToSkip: 0, numberToReturn: 1 }; | ||
if (type === 'insert') queryOptions.checkKeys = false; | ||
@@ -135,3 +155,3 @@ if (typeof options.checkKeys === 'boolean') queryOptions.checkKeys = options.checkKeys; | ||
// Create write command | ||
var cmd = new Query(bson, f('%s.$cmd', d), writeCommand, queryOptions); | ||
const cmd = new Query(bson, `${d}.$cmd`, writeCommand, queryOptions); | ||
// Execute command | ||
@@ -142,3 +162,3 @@ pool.write(cmd, opts, callback); | ||
} | ||
}; | ||
} | ||
@@ -149,11 +169,11 @@ // | ||
// | ||
WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) { | ||
WireProtocol.prototype.insert = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback); | ||
}; | ||
WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) { | ||
WireProtocol.prototype.update = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback); | ||
}; | ||
WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) { | ||
WireProtocol.prototype.remove = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback); | ||
@@ -164,8 +184,8 @@ }; | ||
// Build command namespace | ||
var parts = ns.split(/\./); | ||
const parts = ns.split(/\./); | ||
// Command namespace | ||
var commandns = f('%s.$cmd', parts.shift()); | ||
const commandns = `${parts.shift()}.$cmd`; | ||
const cursorId = cursorState.cursorId; | ||
// Create killCursor command | ||
var killcursorCmd = { | ||
const killcursorCmd = { | ||
killCursors: parts.join('.'), | ||
@@ -176,3 +196,3 @@ cursors: [cursorId] | ||
// Build Query object | ||
var query = new Query(bson, commandns, killcursorCmd, { | ||
const query = new Query(bson, commandns, killcursorCmd, { | ||
numberToSkip: 0, | ||
@@ -185,3 +205,3 @@ numberToReturn: -1, | ||
// Kill cursor callback | ||
var killCursorCallback = function(err, result) { | ||
function killCursorCallback(err, result) { | ||
if (err) { | ||
@@ -193,3 +213,3 @@ if (typeof callback !== 'function') return; | ||
// Result | ||
var r = result.message; | ||
const r = result.message; | ||
// If we have a timed out query or a cursor that was killed | ||
@@ -204,3 +224,3 @@ if ((r.responseFlags & (1 << 0)) !== 0) { | ||
return callback( | ||
new MongoError(f('invalid killCursors result returned for cursor id %s', cursorId)) | ||
new MongoError(`invalid killCursors result returned for cursor id ${cursorId}`) | ||
); | ||
@@ -213,3 +233,3 @@ } | ||
} | ||
}; | ||
} | ||
@@ -248,8 +268,8 @@ const options = { command: true }; | ||
// Build command namespace | ||
var parts = ns.split(/\./); | ||
const parts = ns.split(/\./); | ||
// Command namespace | ||
var commandns = f('%s.$cmd', parts.shift()); | ||
const commandns = `${parts.shift()}.$cmd`; | ||
// Create getMore command | ||
var getMoreCmd = { | ||
const getMoreCmd = { | ||
getMore: cursorState.cursorId, | ||
@@ -261,3 +281,6 @@ collection: parts.join('.'), | ||
// optionally decorate command with transactions data | ||
decorateWithTransactionsData(getMoreCmd, options.session); | ||
const err = decorateWithSessionsData(getMoreCmd, options.session, options, callback); | ||
if (err) { | ||
return callback(err, null); | ||
} | ||
@@ -269,3 +292,3 @@ if (cursorState.cmd.tailable && typeof cursorState.cmd.maxAwaitTimeMS === 'number') { | ||
// Build Query object | ||
var query = new Query(bson, commandns, getMoreCmd, { | ||
const query = new Query(bson, commandns, getMoreCmd, { | ||
numberToSkip: 0, | ||
@@ -278,6 +301,6 @@ numberToReturn: -1, | ||
// Query callback | ||
var queryCallback = function(err, result) { | ||
function queryCallback(err, result) { | ||
if (err) return callback(err); | ||
// Get the raw message | ||
var r = result.message; | ||
const r = result.message; | ||
@@ -302,3 +325,3 @@ // If we have a timed out query or a cursor that was killed | ||
// Ensure we have a Long valid cursor id | ||
var cursorId = | ||
const cursorId = | ||
typeof r.documents[0].cursor.id === 'number' | ||
@@ -314,6 +337,6 @@ ? Long.fromNumber(r.documents[0].cursor.id) | ||
callback(null, r.documents[0], r.connection); | ||
}; | ||
} | ||
// Query options | ||
var queryOptions = { command: true }; | ||
const queryOptions = { command: true }; | ||
@@ -345,7 +368,2 @@ // If we have a raw query decorate the function | ||
// We need to increment the statement id if we're in a transaction | ||
if (options.session && options.session.inTransaction()) { | ||
options.session.incrementStatementId(); | ||
} | ||
// Write out the getMore command | ||
@@ -358,3 +376,3 @@ connection.write(query, queryOptions, queryCallback); | ||
// Check if this is a wire protocol command or not | ||
var wireProtocolCommand = | ||
const wireProtocolCommand = | ||
typeof options.wireProtocolCommand === 'boolean' ? options.wireProtocolCommand : true; | ||
@@ -367,2 +385,3 @@ | ||
query = executeFindCommand(bson, ns, cmd, cursorState, topology, options); | ||
// Mark the cmd as virtual | ||
@@ -377,11 +396,13 @@ cmd.virtual = false; | ||
} else { | ||
throw new MongoError(f('command %s does not return a cursor', JSON.stringify(cmd))); | ||
return new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`); | ||
} | ||
if (query instanceof MongoError) { | ||
return query; | ||
} | ||
// optionally decorate query with transaction data | ||
decorateWithTransactionsData(query.query, options.session); | ||
// We need to increment the statement id if we're in a transaction | ||
if (options.session && options.session.inTransaction()) { | ||
options.session.incrementStatementId(); | ||
const err = decorateWithSessionsData(query.query, options.session, options); | ||
if (err) { | ||
return err; | ||
} | ||
@@ -449,7 +470,7 @@ | ||
// Execute a find command | ||
var executeFindCommand = function(bson, ns, cmd, cursorState, topology, options) { | ||
function executeFindCommand(bson, ns, cmd, cursorState, topology, options) { | ||
// Ensure we have at least some options | ||
options = options || {}; | ||
// Get the readPreference | ||
var readPreference = getReadPreference(cmd, options); | ||
const readPreference = getReadPreference(cmd, options); | ||
@@ -460,8 +481,8 @@ // Set the optional batchSize | ||
// Build command namespace | ||
var parts = ns.split(/\./); | ||
const parts = ns.split(/\./); | ||
// Command namespace | ||
var commandns = f('%s.$cmd', parts.shift()); | ||
const commandns = `${parts.shift()}.$cmd`; | ||
// Build actual find command | ||
var findCmd = { | ||
let findCmd = { | ||
find: parts.join('.') | ||
@@ -481,10 +502,10 @@ }; | ||
// Sort value | ||
var sortValue = cmd.sort; | ||
let sortValue = cmd.sort; | ||
// Handle issue of sort being an Array | ||
if (Array.isArray(sortValue)) { | ||
var sortObject = {}; | ||
const sortObject = {}; | ||
if (sortValue.length > 0 && !Array.isArray(sortValue[0])) { | ||
var sortDirection = sortValue[1]; | ||
let sortDirection = sortValue[1]; | ||
// Translate the sort order text | ||
@@ -501,3 +522,3 @@ if (sortDirection === 'asc') { | ||
for (var i = 0; i < sortValue.length; i++) { | ||
sortDirection = sortValue[i][1]; | ||
let sortDirection = sortValue[i][1]; | ||
// Translate the sort order text | ||
@@ -603,5 +624,5 @@ if (sortDirection === 'asc') { | ||
// Set up the serialize and ignoreUndefined fields | ||
var serializeFunctions = | ||
const serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
var ignoreUndefined = | ||
const ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
@@ -618,6 +639,9 @@ | ||
// optionally decorate query with transaction data | ||
decorateWithTransactionsData(findCmd, options.session); | ||
const err = decorateWithSessionsData(findCmd, options.session, options); | ||
if (err) { | ||
return err; | ||
} | ||
// Build Query object | ||
var query = new Query(bson, commandns, findCmd, { | ||
const query = new Query(bson, commandns, findCmd, { | ||
numberToSkip: 0, | ||
@@ -636,15 +660,15 @@ numberToReturn: 1, | ||
return query; | ||
}; | ||
} | ||
// | ||
// Set up a command cursor | ||
var setupCommand = function(bson, ns, cmd, cursorState, topology, options) { | ||
function setupCommand(bson, ns, cmd, cursorState, topology, options) { | ||
// Set empty options object | ||
options = options || {}; | ||
// Get the readPreference | ||
var readPreference = getReadPreference(cmd, options); | ||
const readPreference = getReadPreference(cmd, options); | ||
// Final query | ||
var finalCmd = {}; | ||
for (var name in cmd) { | ||
let finalCmd = {}; | ||
for (let name in cmd) { | ||
finalCmd[name] = cmd[name]; | ||
@@ -654,10 +678,10 @@ } | ||
// Build command namespace | ||
var parts = ns.split(/\./); | ||
const parts = ns.split(/\./); | ||
// Serialize functions | ||
var serializeFunctions = | ||
const serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
// Set up the serialize and ignoreUndefined fields | ||
var ignoreUndefined = | ||
const ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
@@ -674,6 +698,9 @@ | ||
// optionally decorate query with transaction data | ||
decorateWithTransactionsData(finalCmd, options.session); | ||
const err = decorateWithSessionsData(finalCmd, options.session, options); | ||
if (err) { | ||
return err; | ||
} | ||
// Build Query object | ||
var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, { | ||
const query = new Query(bson, `${parts.shift()}.$cmd`, finalCmd, { | ||
numberToSkip: 0, | ||
@@ -691,4 +718,4 @@ numberToReturn: -1, | ||
return query; | ||
}; | ||
} | ||
module.exports = WireProtocol; |
@@ -37,10 +37,2 @@ 'use strict'; | ||
if ( | ||
options.session && | ||
options.session.inTransaction() && | ||
!readPreference.equals(ReadPreference.primary) | ||
) { | ||
throw new MongoError('read preference in a transaction must be primary'); | ||
} | ||
return readPreference; | ||
@@ -47,0 +39,0 @@ }; |
{ | ||
"name": "mongodb-core", | ||
"version": "3.1.0-beta4", | ||
"version": "3.1.0", | ||
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications", | ||
@@ -11,3 +11,4 @@ "main": "index.js", | ||
"format": "prettier --print-width 100 --tab-width 2 --single-quote --write index.js 'test/**/*.js' 'lib/**/*.js'", | ||
"changelog": "conventional-changelog -p angular -i HISTORY.md -s" | ||
"changelog": "conventional-changelog -p angular -i HISTORY.md -s", | ||
"atlas": "node ./test/atlas.js" | ||
}, | ||
@@ -28,2 +29,3 @@ "repository": { | ||
"chai": "^4.1.2", | ||
"chai-subset": "^1.6.0", | ||
"co": "^4.6.0", | ||
@@ -34,11 +36,13 @@ "conventional-changelog-cli": "^1.3.5", | ||
"jsdoc": "3.5.4", | ||
"mongodb-extjson": "^2.1.2", | ||
"mongodb-mock-server": "^1.0.0", | ||
"mongodb-test-runner": "^1.1.18", | ||
"prettier": "^1.6.1", | ||
"prettier": "~1.12.0", | ||
"sinon": "^6.0.0", | ||
"snappy": "^6.0.1" | ||
}, | ||
"peerOptionalDependencies": { | ||
"kerberos": "^0.0.23", | ||
"kerberos": ">= 0.0.23 < 1.0.0", | ||
"snappy": "^6.0.1", | ||
"bson-ext": "1.0.5" | ||
"bson-ext": "^2.0.0" | ||
}, | ||
@@ -45,0 +49,0 @@ "author": "Christian Kvalheim", |
Sorry, the diff of this file is not supported yet
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
736291
55
15050
0
13