mongodb-core
Advanced tools
Comparing version 3.0.7 to 3.1.0-beta1
@@ -1,12 +0,1 @@ | ||
<a name="3.0.7"></a> | ||
## [3.0.7](https://github.com/mongodb-js/mongodb-core/compare/v3.0.6...v3.0.7) (2018-04-17) | ||
### Bug Fixes | ||
* **auth:** pass auth result to callback ([d2192fd](https://github.com/mongodb-js/mongodb-core/commit/d2192fd)) | ||
* **auth:** prevent stalling on authentication when connected ([f52cf68](https://github.com/mongodb-js/mongodb-core/commit/f52cf68)) | ||
<a name="3.0.6"></a> | ||
@@ -13,0 +2,0 @@ ## [3.0.6](https://github.com/mongodb-js/mongodb-core/compare/v3.0.5...v3.0.6) (2018-04-09) |
@@ -827,3 +827,3 @@ 'use strict'; | ||
for (var i = 0; i < connections.length; i++) { | ||
authenticate(self, args, connections[i], function(err, result) { | ||
authenticate(self, args, connections[i], function(err) { | ||
connectionsCount = connectionsCount - 1; | ||
@@ -854,5 +854,5 @@ | ||
return cb(error, result); | ||
return cb(error); | ||
} | ||
cb(null, result); | ||
cb(null); | ||
} | ||
@@ -874,3 +874,3 @@ }); | ||
// Authenticate all live connections | ||
authenticateLiveConnections(self, args, function(err, result) { | ||
authenticateLiveConnections(self, args, function(err) { | ||
// Credentials correctly stored in auth provider if successful | ||
@@ -880,3 +880,3 @@ // Any new connections will now reauthenticate correctly | ||
// Return after authentication connections | ||
callback(err, result); | ||
callback(err); | ||
}); | ||
@@ -883,0 +883,0 @@ }); |
'use strict'; | ||
const retrieveBSON = require('./connection/utils').retrieveBSON, | ||
EventEmitter = require('events'), | ||
BSON = retrieveBSON(), | ||
Binary = BSON.Binary, | ||
uuidV4 = require('./utils').uuidV4; | ||
const retrieveBSON = require('./connection/utils').retrieveBSON; | ||
const EventEmitter = require('events'); | ||
const BSON = retrieveBSON(); | ||
const Binary = BSON.Binary; | ||
const uuidV4 = require('./utils').uuidV4; | ||
const MongoError = require('./error').MongoError; | ||
const MongoNetworkError = require('./error').MongoNetworkError; | ||
/** | ||
* | ||
*/ | ||
function assertAlive(session, callback) { | ||
if (session.serverSession == null) { | ||
const error = new MongoError('Cannot use a session that has ended'); | ||
if (typeof callback === 'function') { | ||
callback(error, null); | ||
return false; | ||
} | ||
throw error; | ||
} | ||
return true; | ||
} | ||
/** A class representing a client session on the server */ | ||
class ClientSession extends EventEmitter { | ||
constructor(topology, sessionPool, options) { | ||
/** | ||
* Create a client session. | ||
* WARNING: not meant to be instantiated directly | ||
* | ||
* @param {Topology} topology The current client's topology | ||
* @param {ServerSessionPool} sessionPool The server session pool | ||
* @param {Object} [options] Optional settings | ||
* @param {Boolean} [options.causalConsistency] Whether causal consistency should be enabled on this session | ||
* @param {Boolean} [options.autoStartTransaction=false] When enabled this session automatically starts a transaction with the provided defaultTransactionOptions. | ||
* @param {Object} [options.defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session. | ||
* @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver | ||
*/ | ||
constructor(topology, sessionPool, options, clientOptions) { | ||
super(); | ||
@@ -29,5 +55,7 @@ | ||
this.serverSession = sessionPool.acquire(); | ||
this.clientOptions = clientOptions; | ||
this.supports = { | ||
causalConsistency: !!options.causalConsistency | ||
causalConsistency: | ||
typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true | ||
}; | ||
@@ -43,9 +71,15 @@ | ||
this.operationTime = null; | ||
this.explicit = !!options.explicit; | ||
this.owner = options.owner; | ||
this.transactionOptions = null; | ||
this.autoStartTransaction = options.autoStartTransaction; | ||
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions); | ||
} | ||
/** | ||
* Ends this session on the server | ||
* | ||
* @param {Object} [options] Optional settings | ||
* @param {Boolean} [options.skipCommand] Skip sending the actual endSessions command to the server | ||
* @param {Function} [callback] Optional callback for completion of this operation | ||
*/ | ||
@@ -61,2 +95,6 @@ endSession(options, callback) { | ||
if (this.serverSession && this.inTransaction()) { | ||
this.abortTransaction(); // pass in callback? | ||
} | ||
if (!options.skipCommand) { | ||
@@ -104,4 +142,182 @@ // send the `endSessions` command | ||
} | ||
/** | ||
* Increment the transaction number on the internal ServerSession | ||
*/ | ||
incrementTransactionNumber() { | ||
this.serverSession.txnNumber++; | ||
} | ||
/** | ||
* Increment the statement id on the internal ServerSession | ||
* | ||
* @param {Number} [operationCount] the number of operations performed | ||
*/ | ||
incrementStatementId(operationCount) { | ||
operationCount = operationCount || 1; | ||
this.serverSession.stmtId += operationCount; | ||
} | ||
/** | ||
* @returns whether this session is current in a transaction or not | ||
*/ | ||
inTransaction() { | ||
return this.transactionOptions != null; | ||
} | ||
/** | ||
* Starts a new transaction with the given options. | ||
* | ||
* @param {Object} options Optional settings | ||
* @param {ReadConcern} [options.readConcern] The readConcern to use for this transaction | ||
* @param {WriteConcern} [options.writeConcern] The writeConcern to use for this transaction | ||
*/ | ||
startTransaction(options) { | ||
assertAlive(this); | ||
if (this.inTransaction()) { | ||
throw new MongoError('Transaction already in progress'); | ||
} | ||
// increment txnNumber and reset stmtId to zero. | ||
this.serverSession.txnNumber += 1; | ||
this.serverSession.stmtId = 0; | ||
// set transaction options, we will use this to determine if we are in a transaction | ||
this.transactionOptions = Object.assign({}, options || this.defaultTransactionOptions); | ||
} | ||
/** | ||
* Commits the currently active transaction in this session. | ||
* | ||
* @param {Function} [callback] optional callback for completion of this operation | ||
* @return {Promise} A promise is returned if no callback is provided | ||
*/ | ||
commitTransaction(callback) { | ||
if (typeof callback === 'function') { | ||
endTransaction(this, 'commitTransaction', callback); | ||
return; | ||
} | ||
return new Promise((resolve, reject) => { | ||
endTransaction( | ||
this, | ||
'commitTransaction', | ||
(err, reply) => (err ? reject(err) : resolve(reply)) | ||
); | ||
}); | ||
} | ||
/** | ||
* Aborts the currently active transaction in this session. | ||
* | ||
* @param {Function} [callback] optional callback for completion of this operation | ||
* @return {Promise} A promise is returned if no callback is provided | ||
*/ | ||
abortTransaction(callback) { | ||
if (typeof callback === 'function') { | ||
endTransaction(this, 'abortTransaction', callback); | ||
return; | ||
} | ||
return new Promise((resolve, reject) => { | ||
endTransaction( | ||
this, | ||
'abortTransaction', | ||
(err, reply) => (err ? reject(err) : resolve(reply)) | ||
); | ||
}); | ||
} | ||
} | ||
// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms | ||
const RETRYABLE_ERROR_CODES = new Set([ | ||
6, // HostUnreachable | ||
7, // HostNotFound | ||
64, // WriteConcernFailed | ||
89, // NetworkTimeout | ||
91, // ShutdownInProgress | ||
189, // PrimarySteppedDown | ||
9001, // SocketException | ||
11600, // InterruptedAtShutdown | ||
11602, // InterruptedDueToReplStateChange | ||
10107, // NotMaster | ||
13435, // NotMasterNoSlaveOk | ||
13436 // NotMasterOrSecondary | ||
]); | ||
function isRetryableError(error) { | ||
if ( | ||
RETRYABLE_ERROR_CODES.has(error.code) || | ||
error instanceof MongoNetworkError || | ||
error.message.match(/not master/) || | ||
error.message.match(/node is recovering/) | ||
) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
function resetTransactionState(clientSession) { | ||
clientSession.transactionOptions = null; | ||
} | ||
function endTransaction(clientSession, commandName, callback) { | ||
if (!assertAlive(clientSession, callback)) { | ||
// checking result in case callback was called | ||
return; | ||
} | ||
if (!clientSession.inTransaction()) { | ||
if (clientSession.autoStartTransaction) { | ||
clientSession.startTransaction(); | ||
} else { | ||
callback(new MongoError('No transaction started')); | ||
return; | ||
} | ||
} | ||
if (clientSession.serverSession.stmtId === 0) { | ||
// The server transaction was never started. | ||
resetTransactionState(clientSession); | ||
callback(null, null); | ||
return; | ||
} | ||
const command = { [commandName]: 1 }; | ||
if (clientSession.transactionOptions.writeConcern) { | ||
Object.assign(command, { writeConcern: clientSession.transactionOptions.writeConcern }); | ||
} else if (clientSession.clientOptions && clientSession.clientOptions.w) { | ||
Object.assign(command, { writeConcern: { w: clientSession.clientOptions.w } }); | ||
} | ||
function commandHandler(e, r) { | ||
resetTransactionState(clientSession); | ||
callback(e, r); | ||
} | ||
function transactionError(err) { | ||
return commandName === 'commitTransaction' ? err : null; | ||
} | ||
// send the command | ||
clientSession.topology.command( | ||
'admin.$cmd', | ||
command, | ||
{ session: clientSession }, | ||
(err, reply) => { | ||
if (err && isRetryableError(err)) { | ||
return clientSession.topology.command( | ||
'admin.$cmd', | ||
command, | ||
{ session: clientSession }, | ||
(_err, _reply) => commandHandler(transactionError(_err), _reply) | ||
); | ||
} | ||
commandHandler(transactionError(err), reply); | ||
} | ||
); | ||
} | ||
Object.defineProperty(ClientSession.prototype, 'id', { | ||
@@ -108,0 +324,0 @@ get: function() { |
'use strict'; | ||
const inherits = require('util').inherits, | ||
f = require('util').format, | ||
EventEmitter = require('events').EventEmitter, | ||
BasicCursor = require('../cursor'), | ||
Logger = require('../connection/logger'), | ||
retrieveBSON = require('../connection/utils').retrieveBSON, | ||
MongoError = require('../error').MongoError, | ||
errors = require('../error'), | ||
Server = require('./server'), | ||
clone = require('./shared').clone, | ||
diff = require('./shared').diff, | ||
cloneOptions = require('./shared').cloneOptions, | ||
createClientInfo = require('./shared').createClientInfo, | ||
SessionMixins = require('./shared').SessionMixins, | ||
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported, | ||
getNextTransactionNumber = require('./shared').getNextTransactionNumber, | ||
relayEvents = require('./shared').relayEvents; | ||
const inherits = require('util').inherits; | ||
const f = require('util').format; | ||
const EventEmitter = require('events').EventEmitter; | ||
const BasicCursor = require('../cursor'); | ||
const Logger = require('../connection/logger'); | ||
const retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const MongoError = require('../error').MongoError; | ||
const errors = require('../error'); | ||
const Server = require('./server'); | ||
const clone = require('./shared').clone; | ||
const diff = require('./shared').diff; | ||
const cloneOptions = require('./shared').cloneOptions; | ||
const createClientInfo = require('./shared').createClientInfo; | ||
const SessionMixins = require('./shared').SessionMixins; | ||
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; | ||
const relayEvents = require('./shared').relayEvents; | ||
const BSON = retrieveBSON(); | ||
@@ -912,3 +910,3 @@ | ||
// increment and assign txnNumber | ||
options.txnNumber = getNextTransactionNumber(options.session); | ||
options.session.incrementTransactionNumber(); | ||
@@ -915,0 +913,0 @@ server[op](ns, ops, options, (err, result) => { |
'use strict'; | ||
var inherits = require('util').inherits, | ||
f = require('util').format, | ||
EventEmitter = require('events').EventEmitter, | ||
ReadPreference = require('./read_preference'), | ||
BasicCursor = require('../cursor'), | ||
retrieveBSON = require('../connection/utils').retrieveBSON, | ||
Logger = require('../connection/logger'), | ||
MongoError = require('../error').MongoError, | ||
errors = require('../error'), | ||
Server = require('./server'), | ||
ReplSetState = require('./replset_state'), | ||
clone = require('./shared').clone, | ||
Timeout = require('./shared').Timeout, | ||
Interval = require('./shared').Interval, | ||
createClientInfo = require('./shared').createClientInfo, | ||
SessionMixins = require('./shared').SessionMixins, | ||
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported, | ||
getNextTransactionNumber = require('./shared').getNextTransactionNumber, | ||
relayEvents = require('./shared').relayEvents; | ||
const inherits = require('util').inherits; | ||
const f = require('util').format; | ||
const EventEmitter = require('events').EventEmitter; | ||
const ReadPreference = require('./read_preference'); | ||
const BasicCursor = require('../cursor'); | ||
const retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const Logger = require('../connection/logger'); | ||
const MongoError = require('../error').MongoError; | ||
const errors = require('../error'); | ||
const Server = require('./server'); | ||
const ReplSetState = require('./replset_state'); | ||
const clone = require('./shared').clone; | ||
const Timeout = require('./shared').Timeout; | ||
const Interval = require('./shared').Interval; | ||
const createClientInfo = require('./shared').createClientInfo; | ||
const SessionMixins = require('./shared').SessionMixins; | ||
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; | ||
const relayEvents = require('./shared').relayEvents; | ||
@@ -1181,2 +1180,9 @@ var MongoCR = require('../auth/mongocr'), | ||
function ensureTransactionAutostart(session) { | ||
if (!session) return; | ||
if (!session.inTransaction() && session.autoStartTransaction) { | ||
session.startTransaction(); | ||
} | ||
} | ||
// | ||
@@ -1194,6 +1200,12 @@ // Execute write operation | ||
if (self.state === DESTROYED) return callback(new MongoError(f('topology was destroyed'))); | ||
if (self.state === DESTROYED) { | ||
return callback(new MongoError(f('topology was destroyed'))); | ||
} | ||
const willRetryWrite = | ||
!args.retrying && options.retryWrites && options.session && isRetryableWritesSupported(self); | ||
!args.retrying && | ||
options.retryWrites && | ||
options.session && | ||
isRetryableWritesSupported(self) && | ||
!options.session.inTransaction(); | ||
@@ -1235,6 +1247,14 @@ if (!self.s.replicaSetState.hasPrimary()) { | ||
if (willRetryWrite) { | ||
options.txnNumber = getNextTransactionNumber(options.session); | ||
options.session.incrementTransactionNumber(); | ||
} | ||
return self.s.replicaSetState.primary[op](ns, ops, options, handler); | ||
// optionally autostart transaction if requested | ||
ensureTransactionAutostart(options.session); | ||
self.s.replicaSetState.primary[op](ns, ops, options, handler); | ||
// We need to increment the statement id if we're in a transaction | ||
if (options.session && options.session.inTransaction()) { | ||
options.session.incrementStatementId(ops.length); | ||
} | ||
} | ||
@@ -1319,2 +1339,13 @@ | ||
// optionally autostart transaction if requested | ||
ensureTransactionAutostart(options.session); | ||
if ( | ||
options.session && | ||
options.session.inTransaction() && | ||
!readPreference.equals(ReadPreference.primary) | ||
) { | ||
return callback(new MongoError('Read preference in a transaction must be primary')); | ||
} | ||
// If the readPreference is primary and we have no primary, store it | ||
@@ -1321,0 +1352,0 @@ if ( |
'use strict'; | ||
const os = require('os'), | ||
f = require('util').format, | ||
ReadPreference = require('./read_preference'), | ||
retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const os = require('os'); | ||
const f = require('util').format; | ||
const ReadPreference = require('./read_preference'); | ||
const BSON = retrieveBSON(); | ||
/** | ||
@@ -424,12 +421,2 @@ * Emit event if it exists | ||
/** | ||
* Increment the transaction number on the ServerSession contained by the provided ClientSession | ||
* | ||
* @param {ClientSession} session | ||
*/ | ||
const getNextTransactionNumber = function(session) { | ||
session.serverSession.txnNumber++; | ||
return BSON.Long.fromNumber(session.serverSession.txnNumber); | ||
}; | ||
/** | ||
* Relays events for a given listener and emitter | ||
@@ -458,3 +445,2 @@ * | ||
module.exports.isRetryableWritesSupported = isRetryableWritesSupported; | ||
module.exports.getNextTransactionNumber = getNextTransactionNumber; | ||
module.exports.relayEvents = relayEvents; |
'use strict'; | ||
var Query = require('../connection/commands').Query, | ||
retrieveBSON = require('../connection/utils').retrieveBSON, | ||
f = require('util').format, | ||
MongoError = require('../error').MongoError, | ||
MongoNetworkError = require('../error').MongoNetworkError, | ||
getReadPreference = require('./shared').getReadPreference; | ||
const Query = require('../connection/commands').Query; | ||
const retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const f = require('util').format; | ||
const MongoError = require('../error').MongoError; | ||
const MongoNetworkError = require('../error').MongoNetworkError; | ||
const getReadPreference = require('./shared').getReadPreference; | ||
const BSON = retrieveBSON(); | ||
const Long = BSON.Long; | ||
var BSON = retrieveBSON(), | ||
Long = BSON.Long; | ||
var WireProtocol = function(legacyWireProtocol) { | ||
@@ -17,2 +16,49 @@ this.legacyWireProtocol = legacyWireProtocol; | ||
/** | ||
* Optionally decorate a command with transactions specific keys | ||
* | ||
* @param {Object} command the command to decorate | ||
* @param {ClientSession} session the session tracking transaction state | ||
*/ | ||
function decorateWithTransactionsData(command, session) { | ||
if (!session) { | ||
return; | ||
} | ||
// first apply non-transaction-specific sessions data | ||
const serverSession = session.serverSession; | ||
if (serverSession.txnNumber) { | ||
command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber); | ||
} | ||
// now try to apply tansaction-specific data | ||
if (!session.inTransaction()) { | ||
return; | ||
} | ||
command.stmtId = serverSession.stmtId; | ||
command.autocommit = false; | ||
if (serverSession.stmtId === 0) { | ||
command.startTransaction = true; | ||
const readConcern = session.transactionOptions.readConcern || session.clientOptions.readConcern; | ||
if (readConcern) { | ||
command.readConcern = readConcern; | ||
} | ||
if (session.supports.causalConsistency && session.operationTime) { | ||
command.readConcern = command.readConcern || {}; | ||
Object.assign(command.readConcern, { afterClusterTime: session.operationTime }); | ||
} | ||
} else { | ||
// Drivers MUST add this readConcern to the first command in a transaction and MUST NOT | ||
// automatically add any readConcern to subsequent commands. Drivers MUST ignore all other | ||
// readConcerns. | ||
if (command.readConcern) { | ||
delete command.readConcern; | ||
} | ||
} | ||
} | ||
// | ||
@@ -60,6 +106,4 @@ // Execute a write operation | ||
// optionally add a `txnNumber` if retryable writes are being attempted | ||
if (typeof options.txnNumber !== 'undefined') { | ||
writeCommand.txnNumber = options.txnNumber; | ||
} | ||
// optionally decorate command with transactions data | ||
decorateWithTransactionsData(writeCommand, options.session); | ||
@@ -124,5 +168,2 @@ // Options object | ||
// Set query flags | ||
query.slaveOk = true; | ||
// Kill cursor callback | ||
@@ -199,2 +240,5 @@ var killCursorCallback = function(err, result) { | ||
// optionally decorate command with transactions data | ||
decorateWithTransactionsData(getMoreCmd, options.session); | ||
if (cursorState.cmd.tailable && typeof cursorState.cmd.maxAwaitTimeMS === 'number') { | ||
@@ -212,5 +256,2 @@ getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS; | ||
// Set query flags | ||
query.slaveOk = true; | ||
// Query callback | ||
@@ -281,2 +322,7 @@ var queryCallback = function(err, result) { | ||
// We need to increment the statement id if we're in a transaction | ||
if (options.session && options.session.inTransaction()) { | ||
options.session.incrementStatementId(); | ||
} | ||
// Write out the getMore command | ||
@@ -293,5 +339,6 @@ connection.write(query, queryOptions, queryCallback); | ||
// Establish type of command | ||
let query; | ||
if (cmd.find && wireProtocolCommand) { | ||
// Create the find command | ||
var query = executeFindCommand(bson, ns, cmd, cursorState, topology, options); | ||
query = executeFindCommand(bson, ns, cmd, cursorState, topology, options); | ||
// Mark the cmd as virtual | ||
@@ -301,11 +348,19 @@ cmd.virtual = false; | ||
query.documentsReturnedIn = 'firstBatch'; | ||
// Return the query | ||
return query; | ||
} else if (cursorState.cursorId != null) { | ||
return; | ||
} else if (cmd) { | ||
return setupCommand(bson, ns, cmd, cursorState, topology, options); | ||
query = setupCommand(bson, ns, cmd, cursorState, topology, options); | ||
} else { | ||
throw new MongoError(f('command %s does not return a cursor', JSON.stringify(cmd))); | ||
} | ||
// optionally decorate query with transaction data | ||
decorateWithTransactionsData(query.query, options.session); | ||
// We need to increment the statement id if we're in a transaction | ||
if (options.session && options.session.inTransaction()) { | ||
options.session.incrementStatementId(); | ||
} | ||
return query; | ||
}; | ||
@@ -375,2 +430,3 @@ | ||
var readPreference = getReadPreference(cmd, options); | ||
// Set the optional batchSize | ||
@@ -533,2 +589,5 @@ cursorState.batchSize = cmd.batchSize || cursorState.batchSize; | ||
// optionally decorate query with transaction data | ||
decorateWithTransactionsData(findCmd, options.session); | ||
// Build Query object | ||
@@ -584,2 +643,5 @@ var query = new Query(bson, commandns, findCmd, { | ||
// optionally decorate query with transaction data | ||
decorateWithTransactionsData(finalCmd, options.session); | ||
// Build Query object | ||
@@ -586,0 +648,0 @@ var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, { |
@@ -34,5 +34,13 @@ 'use strict'; | ||
if (!(readPreference instanceof ReadPreference)) { | ||
throw new MongoError('readPreference must be a ReadPreference instance'); | ||
throw new MongoError('read preference must be a ReadPreference instance'); | ||
} | ||
if ( | ||
options.session && | ||
options.session.inTransaction() && | ||
!readPreference.equals(ReadPreference.primary) | ||
) { | ||
throw new MongoError('read preference in a transaction must be primary'); | ||
} | ||
return readPreference; | ||
@@ -39,0 +47,0 @@ }; |
{ | ||
"name": "mongodb-core", | ||
"version": "3.0.7", | ||
"version": "3.1.0-beta1", | ||
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
630658
12417
1