mongodb-core
Advanced tools
Comparing version 3.1.11 to 3.2.0-beta1
@@ -36,5 +36,7 @@ 'use strict'; | ||
EJSON: EJSON, | ||
Topology: require('./lib/sdam/topology'), | ||
// Raw operations | ||
Query: require('./lib/connection/commands').Query, | ||
// Auth mechanisms | ||
MongoCredentials: require('./lib/auth/mongo_credentials').MongoCredentials, | ||
defaultAuthProviders: require('./lib/auth/defaultAuthProviders').defaultAuthProviders, | ||
@@ -41,0 +43,0 @@ MongoCR: require('./lib/auth/mongocr'), |
'use strict'; | ||
const f = require('util').format; | ||
const Query = require('../connection/commands').Query; | ||
const MongoError = require('../error').MongoError; | ||
const AuthProvider = require('./auth_provider').AuthProvider; | ||
const retrieveKerberos = require('../utils').retrieveKerberos; | ||
let kerberos; | ||
var AuthSession = function(db, username, password, options) { | ||
this.db = db; | ||
this.username = username; | ||
this.password = password; | ||
this.options = options; | ||
}; | ||
AuthSession.prototype.equal = function(session) { | ||
return ( | ||
session.db === this.db && | ||
session.username === this.username && | ||
session.password === this.password | ||
); | ||
}; | ||
/** | ||
* Creates a new GSSAPI authentication mechanism | ||
* @class | ||
* @return {GSSAPI} A cursor instance | ||
* @extends AuthProvider | ||
*/ | ||
var GSSAPI = function(bson) { | ||
this.bson = bson; | ||
this.authStore = []; | ||
}; | ||
class GSSAPI extends AuthProvider { | ||
/** | ||
* Implementation of authentication for a single connection | ||
* @override | ||
*/ | ||
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) { | ||
const source = credentials.source; | ||
const username = credentials.username; | ||
const password = credentials.password; | ||
const mechanismProperties = credentials.mechanismProperties; | ||
const gssapiServiceName = | ||
mechanismProperties['gssapiservicename'] || | ||
mechanismProperties['gssapiServiceName'] || | ||
'mongodb'; | ||
/** | ||
* Authenticate | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {string} db Name of the database | ||
* @param {string} username Username | ||
* @param {string} password Password | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
GSSAPI.prototype.auth = function(server, connections, db, username, password, options, callback) { | ||
var self = this; | ||
let kerberos; | ||
try { | ||
kerberos = retrieveKerberos(); | ||
} catch (e) { | ||
return callback(e, null); | ||
GSSAPIInitialize( | ||
this, | ||
kerberos.processes.MongoAuthProcess, | ||
source, | ||
username, | ||
password, | ||
source, | ||
gssapiServiceName, | ||
sendAuthCommand, | ||
connection, | ||
mechanismProperties, | ||
callback | ||
); | ||
} | ||
// TODO: remove this once we fix URI parsing | ||
var gssapiServiceName = options['gssapiservicename'] || options['gssapiServiceName'] || 'mongodb'; | ||
// Total connections | ||
var count = connections.length; | ||
if (count === 0) return callback(null, null); | ||
/** | ||
* Authenticate | ||
* @override | ||
* @method | ||
*/ | ||
auth(sendAuthCommand, connections, credentials, callback) { | ||
if (kerberos == null) { | ||
try { | ||
kerberos = retrieveKerberos(); | ||
} catch (e) { | ||
return callback(e, null); | ||
} | ||
} | ||
// Valid connections | ||
var numberOfValidConnections = 0; | ||
var errorObject = null; | ||
// For each connection we need to authenticate | ||
while (connections.length > 0) { | ||
// Execute MongoCR | ||
var execute = function(connection) { | ||
// Start Auth process for a connection | ||
GSSAPIInitialize( | ||
self, | ||
kerberos.processes.MongoAuthProcess, | ||
db, | ||
username, | ||
password, | ||
db, | ||
gssapiServiceName, | ||
server, | ||
connection, | ||
options, | ||
function(err, r) { | ||
// Adjust count | ||
count = count - 1; | ||
// If we have an error | ||
if (err) { | ||
errorObject = err; | ||
} else if (r.result['$err']) { | ||
errorObject = r.result; | ||
} else if (r.result['errmsg']) { | ||
errorObject = r.result; | ||
} else { | ||
numberOfValidConnections = numberOfValidConnections + 1; | ||
} | ||
// We have authenticated all connections | ||
if (count === 0 && numberOfValidConnections > 0) { | ||
// Store the auth details | ||
addAuthSession(self.authStore, new AuthSession(db, username, password, options)); | ||
// Return correct authentication | ||
callback(null, true); | ||
} else if (count === 0) { | ||
if (errorObject == null) | ||
errorObject = new MongoError(f('failed to authenticate using mongocr')); | ||
callback(errorObject, false); | ||
} | ||
} | ||
); | ||
}; | ||
var _execute = function(_connection) { | ||
process.nextTick(function() { | ||
execute(_connection); | ||
}); | ||
}; | ||
_execute(connections.shift()); | ||
super.auth(sendAuthCommand, connections, credentials, callback); | ||
} | ||
}; | ||
} | ||
@@ -129,3 +70,3 @@ // | ||
gssapiServiceName, | ||
server, | ||
sendAuthCommand, | ||
connection, | ||
@@ -160,3 +101,3 @@ options, | ||
authdb, | ||
server, | ||
sendAuthCommand, | ||
connection, | ||
@@ -179,3 +120,3 @@ callback | ||
authdb, | ||
server, | ||
sendAuthCommand, | ||
connection, | ||
@@ -193,32 +134,24 @@ callback | ||
// Write the commmand on the connection | ||
server( | ||
connection, | ||
new Query(self.bson, '$external.$cmd', command, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
sendAuthCommand(connection, '$external.$cmd', command, (err, doc) => { | ||
if (err) return callback(err, false); | ||
// Execute mongodb transition | ||
mongo_auth_process.transition(doc.payload, function(err, payload) { | ||
if (err) return callback(err, false); | ||
var doc = r.result; | ||
// Execute mongodb transition | ||
mongo_auth_process.transition(r.result.payload, function(err, payload) { | ||
if (err) return callback(err, false); | ||
// MongoDB API Second Step | ||
MongoDBGSSAPISecondStep( | ||
self, | ||
mongo_auth_process, | ||
payload, | ||
doc, | ||
db, | ||
username, | ||
password, | ||
authdb, | ||
server, | ||
connection, | ||
callback | ||
); | ||
}); | ||
} | ||
); | ||
// MongoDB API Second Step | ||
MongoDBGSSAPISecondStep( | ||
self, | ||
mongo_auth_process, | ||
payload, | ||
doc, | ||
db, | ||
username, | ||
password, | ||
authdb, | ||
sendAuthCommand, | ||
connection, | ||
callback | ||
); | ||
}); | ||
}); | ||
}; | ||
@@ -237,3 +170,3 @@ | ||
authdb, | ||
server, | ||
sendAuthCommand, | ||
connection, | ||
@@ -251,32 +184,24 @@ callback | ||
// Write the commmand on the connection | ||
server( | ||
connection, | ||
new Query(self.bson, '$external.$cmd', command, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
sendAuthCommand(connection, '$external.$cmd', command, (err, doc) => { | ||
if (err) return callback(err, false); | ||
// Call next transition for kerberos | ||
mongo_auth_process.transition(doc.payload, function(err, payload) { | ||
if (err) return callback(err, false); | ||
var doc = r.result; | ||
// Call next transition for kerberos | ||
mongo_auth_process.transition(doc.payload, function(err, payload) { | ||
if (err) return callback(err, false); | ||
// Call the last and third step | ||
MongoDBGSSAPIThirdStep( | ||
self, | ||
mongo_auth_process, | ||
payload, | ||
doc, | ||
db, | ||
username, | ||
password, | ||
authdb, | ||
server, | ||
connection, | ||
callback | ||
); | ||
}); | ||
} | ||
); | ||
// Call the last and third step | ||
MongoDBGSSAPIThirdStep( | ||
self, | ||
mongo_auth_process, | ||
payload, | ||
doc, | ||
db, | ||
username, | ||
password, | ||
authdb, | ||
sendAuthCommand, | ||
connection, | ||
callback | ||
); | ||
}); | ||
}); | ||
}; | ||
@@ -293,3 +218,3 @@ | ||
authdb, | ||
server, | ||
sendAuthCommand, | ||
connection, | ||
@@ -306,41 +231,8 @@ callback | ||
// Execute the command | ||
server( | ||
connection, | ||
new Query(self.bson, '$external.$cmd', command, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
if (err) return callback(err, false); | ||
mongo_auth_process.transition(null, function(err) { | ||
if (err) return callback(err, null); | ||
callback(null, r); | ||
}); | ||
} | ||
); | ||
}; | ||
// Add to store only if it does not exist | ||
var addAuthSession = function(authStore, session) { | ||
var found = false; | ||
for (var i = 0; i < authStore.length; i++) { | ||
if (authStore[i].equal(session)) { | ||
found = true; | ||
break; | ||
} | ||
} | ||
if (!found) authStore.push(session); | ||
}; | ||
/** | ||
* Remove authStore credentials | ||
* @method | ||
* @param {string} db Name of database we are removing authStore details about | ||
* @return {object} | ||
*/ | ||
GSSAPI.prototype.logout = function(dbName) { | ||
this.authStore = this.authStore.filter(function(x) { | ||
return x.db !== dbName; | ||
sendAuthCommand(connection, '$external.$cmd', command, (err, r) => { | ||
if (err) return callback(err, false); | ||
mongo_auth_process.transition(null, function(err) { | ||
if (err) return callback(err, null); | ||
callback(null, r); | ||
}); | ||
}); | ||
@@ -350,34 +242,2 @@ }; | ||
/** | ||
* Re authenticate pool | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
GSSAPI.prototype.reauthenticate = function(server, connections, callback) { | ||
var authStore = this.authStore.slice(0); | ||
var count = authStore.length; | ||
if (count === 0) return callback(null, null); | ||
// Iterate over all the auth details stored | ||
for (var i = 0; i < authStore.length; i++) { | ||
this.auth( | ||
server, | ||
connections, | ||
authStore[i].db, | ||
authStore[i].username, | ||
authStore[i].password, | ||
authStore[i].options, | ||
function(err) { | ||
count = count - 1; | ||
// Done re-authenticating | ||
if (count === 0) { | ||
callback(err, null); | ||
} | ||
} | ||
); | ||
} | ||
}; | ||
/** | ||
* This is a result from a authentication strategy | ||
@@ -384,0 +244,0 @@ * |
'use strict'; | ||
var f = require('util').format, | ||
crypto = require('crypto'), | ||
Query = require('../connection/commands').Query, | ||
MongoError = require('../error').MongoError; | ||
const crypto = require('crypto'); | ||
const AuthProvider = require('./auth_provider').AuthProvider; | ||
var AuthSession = function(db, username, password) { | ||
this.db = db; | ||
this.username = username; | ||
this.password = password; | ||
}; | ||
AuthSession.prototype.equal = function(session) { | ||
return ( | ||
session.db === this.db && | ||
session.username === this.username && | ||
session.password === this.password | ||
); | ||
}; | ||
/** | ||
* Creates a new MongoCR authentication mechanism | ||
* @class | ||
* @return {MongoCR} A cursor instance | ||
* | ||
* @extends AuthProvider | ||
*/ | ||
var MongoCR = function(bson) { | ||
this.bson = bson; | ||
this.authStore = []; | ||
}; | ||
class MongoCR extends AuthProvider { | ||
/** | ||
* Implementation of authentication for a single connection | ||
* @override | ||
*/ | ||
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) { | ||
const username = credentials.username; | ||
const password = credentials.password; | ||
const source = credentials.source; | ||
// Add to store only if it does not exist | ||
var addAuthSession = function(authStore, session) { | ||
var found = false; | ||
sendAuthCommand(connection, `${source}.$cmd`, { getnonce: 1 }, (err, r) => { | ||
let nonce = null; | ||
let key = null; | ||
for (var i = 0; i < authStore.length; i++) { | ||
if (authStore[i].equal(session)) { | ||
found = true; | ||
break; | ||
} | ||
} | ||
// Get nonce | ||
if (err == null) { | ||
nonce = r.nonce; | ||
// Use node md5 generator | ||
let md5 = crypto.createHash('md5'); | ||
// Generate keys used for authentication | ||
md5.update(username + ':mongo:' + password, 'utf8'); | ||
const hash_password = md5.digest('hex'); | ||
// Final key | ||
md5 = crypto.createHash('md5'); | ||
md5.update(nonce + username + hash_password, 'utf8'); | ||
key = md5.digest('hex'); | ||
} | ||
if (!found) authStore.push(session); | ||
}; | ||
const authenticateCommand = { | ||
authenticate: 1, | ||
user: username, | ||
nonce, | ||
key | ||
}; | ||
/** | ||
* Authenticate | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {string} db Name of the database | ||
* @param {string} username Username | ||
* @param {string} password Password | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
MongoCR.prototype.auth = function(server, connections, db, username, password, callback) { | ||
var self = this; | ||
// Total connections | ||
var count = connections.length; | ||
if (count === 0) return callback(null, null); | ||
// Valid connections | ||
var numberOfValidConnections = 0; | ||
var errorObject = null; | ||
// For each connection we need to authenticate | ||
while (connections.length > 0) { | ||
// Execute MongoCR | ||
var executeMongoCR = function(connection) { | ||
// Write the commmand on the connection | ||
server( | ||
connection, | ||
new Query( | ||
self.bson, | ||
f('%s.$cmd', db), | ||
{ | ||
getnonce: 1 | ||
}, | ||
{ | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
} | ||
), | ||
function(err, r) { | ||
var nonce = null; | ||
var key = null; | ||
// Adjust the number of connections left | ||
// Get nonce | ||
if (err == null) { | ||
nonce = r.result.nonce; | ||
// Use node md5 generator | ||
var md5 = crypto.createHash('md5'); | ||
// Generate keys used for authentication | ||
md5.update(username + ':mongo:' + password, 'utf8'); | ||
var hash_password = md5.digest('hex'); | ||
// Final key | ||
md5 = crypto.createHash('md5'); | ||
md5.update(nonce + username + hash_password, 'utf8'); | ||
key = md5.digest('hex'); | ||
} | ||
// Execute command | ||
// Write the commmand on the connection | ||
server( | ||
connection, | ||
new Query( | ||
self.bson, | ||
f('%s.$cmd', db), | ||
{ | ||
authenticate: 1, | ||
user: username, | ||
nonce: nonce, | ||
key: key | ||
}, | ||
{ | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
} | ||
), | ||
function(err, r) { | ||
count = count - 1; | ||
// If we have an error | ||
if (err) { | ||
errorObject = err; | ||
} else if (r.result['$err']) { | ||
errorObject = r.result; | ||
} else if (r.result['errmsg']) { | ||
errorObject = r.result; | ||
} else { | ||
numberOfValidConnections = numberOfValidConnections + 1; | ||
} | ||
// We have authenticated all connections | ||
if (count === 0 && numberOfValidConnections > 0) { | ||
// Store the auth details | ||
addAuthSession(self.authStore, new AuthSession(db, username, password)); | ||
// Return correct authentication | ||
callback(null, true); | ||
} else if (count === 0) { | ||
if (errorObject == null) | ||
errorObject = new MongoError(f('failed to authenticate using mongocr')); | ||
callback(errorObject, false); | ||
} | ||
} | ||
); | ||
} | ||
); | ||
}; | ||
var _execute = function(_connection) { | ||
process.nextTick(function() { | ||
executeMongoCR(_connection); | ||
}); | ||
}; | ||
_execute(connections.shift()); | ||
sendAuthCommand(connection, `${source}.$cmd`, authenticateCommand, callback); | ||
}); | ||
} | ||
}; | ||
} | ||
/** | ||
* Remove authStore credentials | ||
* @method | ||
* @param {string} db Name of database we are removing authStore details about | ||
* @return {object} | ||
*/ | ||
MongoCR.prototype.logout = function(dbName) { | ||
this.authStore = this.authStore.filter(function(x) { | ||
return x.db !== dbName; | ||
}); | ||
}; | ||
/** | ||
* Re authenticate pool | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
MongoCR.prototype.reauthenticate = function(server, connections, callback) { | ||
var authStore = this.authStore.slice(0); | ||
var count = authStore.length; | ||
if (count === 0) return callback(null, null); | ||
// Iterate over all the auth details stored | ||
for (var i = 0; i < authStore.length; i++) { | ||
this.auth( | ||
server, | ||
connections, | ||
authStore[i].db, | ||
authStore[i].username, | ||
authStore[i].password, | ||
function(err) { | ||
count = count - 1; | ||
// Done re-authenticating | ||
if (count === 0) { | ||
callback(err, null); | ||
} | ||
} | ||
); | ||
} | ||
}; | ||
/** | ||
* This is a result from a authentication strategy | ||
* | ||
* @callback authResultCallback | ||
* @param {error} error An error object. Set to null if no error present | ||
* @param {boolean} result The result of the authentication process | ||
*/ | ||
module.exports = MongoCR; |
'use strict'; | ||
var f = require('util').format, | ||
retrieveBSON = require('../connection/utils').retrieveBSON, | ||
Query = require('../connection/commands').Query, | ||
MongoError = require('../error').MongoError; | ||
const retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const AuthProvider = require('./auth_provider').AuthProvider; | ||
var BSON = retrieveBSON(), | ||
Binary = BSON.Binary; | ||
// TODO: can we get the Binary type from this.bson instead? | ||
const BSON = retrieveBSON(); | ||
const Binary = BSON.Binary; | ||
var AuthSession = function(db, username, password) { | ||
this.db = db; | ||
this.username = username; | ||
this.password = password; | ||
}; | ||
AuthSession.prototype.equal = function(session) { | ||
return ( | ||
session.db === this.db && | ||
session.username === this.username && | ||
session.password === this.password | ||
); | ||
}; | ||
/** | ||
* Creates a new Plain authentication mechanism | ||
* @class | ||
* @return {Plain} A cursor instance | ||
* | ||
* @extends AuthProvider | ||
*/ | ||
var Plain = function(bson) { | ||
this.bson = bson; | ||
this.authStore = []; | ||
}; | ||
/** | ||
* Authenticate | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {string} db Name of the database | ||
* @param {string} username Username | ||
* @param {string} password Password | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
Plain.prototype.auth = function(server, connections, db, username, password, callback) { | ||
var self = this; | ||
// Total connections | ||
var count = connections.length; | ||
if (count === 0) return callback(null, null); | ||
// Valid connections | ||
var numberOfValidConnections = 0; | ||
var errorObject = null; | ||
// For each connection we need to authenticate | ||
while (connections.length > 0) { | ||
// Execute MongoCR | ||
var execute = function(connection) { | ||
// Create payload | ||
var payload = new Binary(f('\x00%s\x00%s', username, password)); | ||
// Let's start the sasl process | ||
var command = { | ||
saslStart: 1, | ||
mechanism: 'PLAIN', | ||
payload: payload, | ||
autoAuthorize: 1 | ||
}; | ||
// Let's start the process | ||
server( | ||
connection, | ||
new Query(self.bson, '$external.$cmd', command, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
// Adjust count | ||
count = count - 1; | ||
// If we have an error | ||
if (err) { | ||
errorObject = err; | ||
} else if (r.result['$err']) { | ||
errorObject = r.result; | ||
} else if (r.result['errmsg']) { | ||
errorObject = r.result; | ||
} else { | ||
numberOfValidConnections = numberOfValidConnections + 1; | ||
} | ||
// We have authenticated all connections | ||
if (count === 0 && numberOfValidConnections > 0) { | ||
// Store the auth details | ||
addAuthSession(self.authStore, new AuthSession(db, username, password)); | ||
// Return correct authentication | ||
callback(null, true); | ||
} else if (count === 0) { | ||
if (errorObject == null) | ||
errorObject = new MongoError(f('failed to authenticate using mongocr')); | ||
callback(errorObject, false); | ||
} | ||
} | ||
); | ||
class Plain extends AuthProvider { | ||
/** | ||
* Implementation of authentication for a single connection | ||
* @override | ||
*/ | ||
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) { | ||
const username = credentials.username; | ||
const password = credentials.password; | ||
const payload = new Binary(`\x00${username}\x00${password}`); | ||
const command = { | ||
saslStart: 1, | ||
mechanism: 'PLAIN', | ||
payload: payload, | ||
autoAuthorize: 1 | ||
}; | ||
var _execute = function(_connection) { | ||
process.nextTick(function() { | ||
execute(_connection); | ||
}); | ||
}; | ||
_execute(connections.shift()); | ||
sendAuthCommand(connection, '$external.$cmd', command, callback); | ||
} | ||
}; | ||
} | ||
// Add to store only if it does not exist | ||
var addAuthSession = function(authStore, session) { | ||
var found = false; | ||
for (var i = 0; i < authStore.length; i++) { | ||
if (authStore[i].equal(session)) { | ||
found = true; | ||
break; | ||
} | ||
} | ||
if (!found) authStore.push(session); | ||
}; | ||
/** | ||
* Remove authStore credentials | ||
* @method | ||
* @param {string} db Name of database we are removing authStore details about | ||
* @return {object} | ||
*/ | ||
Plain.prototype.logout = function(dbName) { | ||
this.authStore = this.authStore.filter(function(x) { | ||
return x.db !== dbName; | ||
}); | ||
}; | ||
/** | ||
* Re authenticate pool | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
Plain.prototype.reauthenticate = function(server, connections, callback) { | ||
var authStore = this.authStore.slice(0); | ||
var count = authStore.length; | ||
if (count === 0) return callback(null, null); | ||
// Iterate over all the auth details stored | ||
for (var i = 0; i < authStore.length; i++) { | ||
this.auth( | ||
server, | ||
connections, | ||
authStore[i].db, | ||
authStore[i].username, | ||
authStore[i].password, | ||
function(err) { | ||
count = count - 1; | ||
// Done re-authenticating | ||
if (count === 0) { | ||
callback(err, null); | ||
} | ||
} | ||
); | ||
} | ||
}; | ||
/** | ||
* This is a result from a authentication strategy | ||
* | ||
* @callback authResultCallback | ||
* @param {error} error An error object. Set to null if no error present | ||
* @param {boolean} result The result of the authentication process | ||
*/ | ||
module.exports = Plain; |
'use strict'; | ||
var f = require('util').format, | ||
crypto = require('crypto'), | ||
retrieveBSON = require('../connection/utils').retrieveBSON, | ||
Query = require('../connection/commands').Query, | ||
MongoError = require('../error').MongoError, | ||
Buffer = require('safe-buffer').Buffer; | ||
const crypto = require('crypto'); | ||
const Buffer = require('safe-buffer').Buffer; | ||
const retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const MongoError = require('../error').MongoError; | ||
const AuthProvider = require('./auth_provider').AuthProvider; | ||
const BSON = retrieveBSON(); | ||
const Binary = BSON.Binary; | ||
let saslprep; | ||
try { | ||
@@ -18,33 +19,2 @@ saslprep = require('saslprep'); | ||
var BSON = retrieveBSON(), | ||
Binary = BSON.Binary; | ||
var AuthSession = function(db, username, password) { | ||
this.db = db; | ||
this.username = username; | ||
this.password = password; | ||
}; | ||
AuthSession.prototype.equal = function(session) { | ||
return ( | ||
session.db === this.db && | ||
session.username === this.username && | ||
session.password === this.password | ||
); | ||
}; | ||
var id = 0; | ||
/** | ||
* Creates a new ScramSHA authentication mechanism | ||
* @class | ||
* @return {ScramSHA} A cursor instance | ||
*/ | ||
var ScramSHA = function(bson, cryptoMethod) { | ||
this.bson = bson; | ||
this.authStore = []; | ||
this.id = id++; | ||
this.cryptoMethod = cryptoMethod || 'sha1'; | ||
}; | ||
var parsePayload = function(payload) { | ||
@@ -140,50 +110,49 @@ var dict = {}; | ||
/** | ||
* Authenticate | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {string} db Name of the database | ||
* @param {string} username Username | ||
* @param {string} password Password | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
* Creates a new ScramSHA authentication mechanism | ||
* @class | ||
* @extends AuthProvider | ||
*/ | ||
ScramSHA.prototype.auth = function(server, connections, db, username, password, callback) { | ||
var self = this; | ||
// Total connections | ||
var count = connections.length; | ||
if (count === 0) return callback(null, null); | ||
class ScramSHA extends AuthProvider { | ||
constructor(bson, cryptoMethod) { | ||
super(bson); | ||
this.cryptoMethod = cryptoMethod || 'sha1'; | ||
} | ||
// Valid connections | ||
var numberOfValidConnections = 0; | ||
var errorObject = null; | ||
static _getError(err, r) { | ||
if (err) { | ||
return err; | ||
} | ||
const cryptoMethod = this.cryptoMethod; | ||
let mechanism = 'SCRAM-SHA-1'; | ||
let processedPassword; | ||
if (r.$err || r.errmsg) { | ||
return new MongoError(r); | ||
} | ||
} | ||
if (cryptoMethod === 'sha256') { | ||
mechanism = 'SCRAM-SHA-256'; | ||
/** | ||
* @ignore | ||
*/ | ||
_executeScram(sendAuthCommand, connection, credentials, nonce, callback) { | ||
let username = credentials.username; | ||
const password = credentials.password; | ||
const db = credentials.source; | ||
let saslprepFn = (server.s && server.s.saslprep) || saslprep; | ||
const cryptoMethod = this.cryptoMethod; | ||
let mechanism = 'SCRAM-SHA-1'; | ||
let processedPassword; | ||
if (saslprepFn) { | ||
processedPassword = saslprepFn(password); | ||
if (cryptoMethod === 'sha256') { | ||
mechanism = 'SCRAM-SHA-256'; | ||
processedPassword = saslprep ? saslprep(password) : password; | ||
} else { | ||
console.warn('Warning: no saslprep library specified. Passwords will not be sanitized'); | ||
processedPassword = password; | ||
try { | ||
processedPassword = passwordDigest(username, password); | ||
} catch (e) { | ||
return callback(e); | ||
} | ||
} | ||
} else { | ||
processedPassword = passwordDigest(username, password); | ||
} | ||
// Execute MongoCR | ||
var executeScram = function(connection) { | ||
// Clean up the user | ||
username = username.replace('=', '=3D').replace(',', '=2C'); | ||
// Create a random nonce | ||
var nonce = crypto.randomBytes(24).toString('base64'); | ||
// var nonce = 'MsQUY9iw0T9fx2MUEz6LZPwGuhVvWAhc' | ||
// NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8. | ||
@@ -199,5 +168,5 @@ // Since the username is not sasl-prep-d, we need to do this here. | ||
// Build command structure | ||
var cmd = { | ||
const saslStartCmd = { | ||
saslStart: 1, | ||
mechanism: mechanism, | ||
mechanism, | ||
payload: new Binary(Buffer.concat([Buffer.from('n,,', 'utf8'), firstBare])), | ||
@@ -207,228 +176,104 @@ autoAuthorize: 1 | ||
// Handle the error | ||
var handleError = function(err, r) { | ||
if (err) { | ||
numberOfValidConnections = numberOfValidConnections - 1; | ||
errorObject = err; | ||
return false; | ||
} else if (r.result['$err']) { | ||
errorObject = r.result; | ||
return false; | ||
} else if (r.result['errmsg']) { | ||
errorObject = r.result; | ||
return false; | ||
} else { | ||
numberOfValidConnections = numberOfValidConnections + 1; | ||
// Write the commmand on the connection | ||
sendAuthCommand(connection, `${db}.$cmd`, saslStartCmd, (err, r) => { | ||
let tmpError = ScramSHA._getError(err, r); | ||
if (tmpError) { | ||
return callback(tmpError, null); | ||
} | ||
return true; | ||
}; | ||
const dict = parsePayload(r.payload.value()); | ||
const iterations = parseInt(dict.i, 10); | ||
const salt = dict.s; | ||
const rnonce = dict.r; | ||
// Finish up | ||
var finish = function(_count, _numberOfValidConnections) { | ||
if (_count === 0 && _numberOfValidConnections > 0) { | ||
// Store the auth details | ||
addAuthSession(self.authStore, new AuthSession(db, username, password)); | ||
// Return correct authentication | ||
return callback(null, true); | ||
} else if (_count === 0) { | ||
if (errorObject == null) | ||
errorObject = new MongoError(f('failed to authenticate using scram')); | ||
return callback(errorObject, false); | ||
// Set up start of proof | ||
const withoutProof = `c=biws,r=${rnonce}`; | ||
const saltedPassword = HI( | ||
processedPassword, | ||
Buffer.from(salt, 'base64'), | ||
iterations, | ||
cryptoMethod | ||
); | ||
if (iterations && iterations < 4096) { | ||
const error = new MongoError(`Server returned an invalid iteration count ${iterations}`); | ||
return callback(error, false); | ||
} | ||
}; | ||
var handleEnd = function(_err, _r) { | ||
// Handle any error | ||
handleError(_err, _r); | ||
// Adjust the number of connections | ||
count = count - 1; | ||
// Execute the finish | ||
finish(count, numberOfValidConnections); | ||
}; | ||
const clientKey = HMAC(cryptoMethod, saltedPassword, 'Client Key'); | ||
const storedKey = H(cryptoMethod, clientKey); | ||
const authMessage = [firstBare, r.payload.value().toString('base64'), withoutProof].join(','); | ||
// Write the commmand on the connection | ||
server( | ||
connection, | ||
new Query(self.bson, f('%s.$cmd', db), cmd, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
// Do we have an error, handle it | ||
if (handleError(err, r) === false) { | ||
count = count - 1; | ||
const clientSignature = HMAC(cryptoMethod, storedKey, authMessage); | ||
const clientProof = `p=${xor(clientKey, clientSignature)}`; | ||
const clientFinal = [withoutProof, clientProof].join(','); | ||
const saslContinueCmd = { | ||
saslContinue: 1, | ||
conversationId: r.conversationId, | ||
payload: new Binary(Buffer.from(clientFinal)) | ||
}; | ||
if (count === 0 && numberOfValidConnections > 0) { | ||
// Store the auth details | ||
addAuthSession(self.authStore, new AuthSession(db, username, password)); | ||
// Return correct authentication | ||
return callback(null, true); | ||
} else if (count === 0) { | ||
if (errorObject == null) | ||
errorObject = new MongoError(f('failed to authenticate using scram')); | ||
return callback(errorObject, false); | ||
} | ||
return; | ||
sendAuthCommand(connection, `${db}.$cmd`, saslContinueCmd, (err, r) => { | ||
if (!r || r.done !== false) { | ||
return callback(err, r); | ||
} | ||
// Get the dictionary | ||
var dict = parsePayload(r.result.payload.value()); | ||
// Unpack dictionary | ||
var iterations = parseInt(dict.i, 10); | ||
var salt = dict.s; | ||
var rnonce = dict.r; | ||
// Set up start of proof | ||
var withoutProof = f('c=biws,r=%s', rnonce); | ||
var saltedPassword = HI( | ||
processedPassword, | ||
Buffer.from(salt, 'base64'), | ||
iterations, | ||
cryptoMethod | ||
); | ||
if (iterations && iterations < 4096) { | ||
const error = new MongoError(`Server returned an invalid iteration count ${iterations}`); | ||
return callback(error, false); | ||
} | ||
// Create the client key | ||
const clientKey = HMAC(cryptoMethod, saltedPassword, 'Client Key'); | ||
// Create the stored key | ||
const storedKey = H(cryptoMethod, clientKey); | ||
// Create the authentication message | ||
const authMessage = [ | ||
firstBare, | ||
r.result.payload.value().toString('base64'), | ||
withoutProof | ||
].join(','); | ||
// Create client signature | ||
const clientSignature = HMAC(cryptoMethod, storedKey, authMessage); | ||
// Create client proof | ||
const clientProof = f('p=%s', xor(clientKey, clientSignature)); | ||
// Create client final | ||
const clientFinal = [withoutProof, clientProof].join(','); | ||
// Create continue message | ||
const cmd = { | ||
const retrySaslContinueCmd = { | ||
saslContinue: 1, | ||
conversationId: r.result.conversationId, | ||
payload: new Binary(Buffer.from(clientFinal)) | ||
conversationId: r.conversationId, | ||
payload: Buffer.alloc(0) | ||
}; | ||
// | ||
// Execute sasl continue | ||
// Write the commmand on the connection | ||
server( | ||
connection, | ||
new Query(self.bson, f('%s.$cmd', db), cmd, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
if (r && r.result.done === false) { | ||
var cmd = { | ||
saslContinue: 1, | ||
conversationId: r.result.conversationId, | ||
payload: Buffer.alloc(0) | ||
}; | ||
sendAuthCommand(connection, `${db}.$cmd`, retrySaslContinueCmd, callback); | ||
}); | ||
}); | ||
} | ||
// Write the commmand on the connection | ||
server( | ||
connection, | ||
new Query(self.bson, f('%s.$cmd', db), cmd, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
handleEnd(err, r); | ||
} | ||
); | ||
} else { | ||
handleEnd(err, r); | ||
} | ||
} | ||
); | ||
/** | ||
* Implementation of authentication for a single connection | ||
* @override | ||
*/ | ||
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) { | ||
// Create a random nonce | ||
crypto.randomBytes(24, (err, buff) => { | ||
if (err) { | ||
return callback(err, null); | ||
} | ||
); | ||
}; | ||
var _execute = function(_connection) { | ||
process.nextTick(function() { | ||
executeScram(_connection); | ||
return this._executeScram( | ||
sendAuthCommand, | ||
connection, | ||
credentials, | ||
buff.toString('base64'), | ||
callback | ||
); | ||
}); | ||
}; | ||
} | ||
// For each connection we need to authenticate | ||
while (connections.length > 0) { | ||
_execute(connections.shift()); | ||
/** | ||
* Authenticate | ||
* @override | ||
* @method | ||
*/ | ||
auth(sendAuthCommand, connections, credentials, callback) { | ||
this._checkSaslprep(); | ||
super.auth(sendAuthCommand, connections, credentials, callback); | ||
} | ||
}; | ||
// Add to store only if it does not exist | ||
var addAuthSession = function(authStore, session) { | ||
var found = false; | ||
_checkSaslprep() { | ||
const cryptoMethod = this.cryptoMethod; | ||
for (var i = 0; i < authStore.length; i++) { | ||
if (authStore[i].equal(session)) { | ||
found = true; | ||
break; | ||
if (cryptoMethod === 'sha256') { | ||
if (!saslprep) { | ||
console.warn('Warning: no saslprep library specified. Passwords will not be sanitized'); | ||
} | ||
} | ||
} | ||
} | ||
if (!found) authStore.push(session); | ||
}; | ||
/** | ||
* Remove authStore credentials | ||
* @method | ||
* @param {string} db Name of database we are removing authStore details about | ||
* @return {object} | ||
* Creates a new ScramSHA1 authentication mechanism | ||
* @class | ||
* @extends ScramSHA | ||
*/ | ||
ScramSHA.prototype.logout = function(dbName) { | ||
this.authStore = this.authStore.filter(function(x) { | ||
return x.db !== dbName; | ||
}); | ||
}; | ||
/** | ||
* Re authenticate pool | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
ScramSHA.prototype.reauthenticate = function(server, connections, callback) { | ||
var authStore = this.authStore.slice(0); | ||
var count = authStore.length; | ||
// No connections | ||
if (count === 0) return callback(null, null); | ||
// Iterate over all the auth details stored | ||
for (var i = 0; i < authStore.length; i++) { | ||
this.auth( | ||
server, | ||
connections, | ||
authStore[i].db, | ||
authStore[i].username, | ||
authStore[i].password, | ||
function(err) { | ||
count = count - 1; | ||
// Done re-authenticating | ||
if (count === 0) { | ||
callback(err, null); | ||
} | ||
} | ||
); | ||
} | ||
}; | ||
class ScramSHA1 extends ScramSHA { | ||
@@ -440,2 +285,7 @@ constructor(bson) { | ||
/** | ||
* Creates a new ScramSHA256 authentication mechanism | ||
* @class | ||
* @extends ScramSHA | ||
*/ | ||
class ScramSHA256 extends ScramSHA { | ||
@@ -442,0 +292,0 @@ constructor(bson) { |
'use strict'; | ||
const f = require('util').format; | ||
const Query = require('../connection/commands').Query; | ||
const MongoError = require('../error').MongoError; | ||
const AuthProvider = require('./auth_provider').AuthProvider; | ||
const retrieveKerberos = require('../utils').retrieveKerberos; | ||
let kerberos; | ||
var AuthSession = function(db, username, password, options) { | ||
this.db = db; | ||
this.username = username; | ||
this.password = password; | ||
this.options = options; | ||
}; | ||
AuthSession.prototype.equal = function(session) { | ||
return ( | ||
session.db === this.db && | ||
session.username === this.username && | ||
session.password === this.password | ||
); | ||
}; | ||
/** | ||
* Creates a new SSPI authentication mechanism | ||
* @class | ||
* @return {SSPI} A cursor instance | ||
* @extends AuthProvider | ||
*/ | ||
var SSPI = function(bson) { | ||
this.bson = bson; | ||
this.authStore = []; | ||
}; | ||
class SSPI extends AuthProvider { | ||
/** | ||
* Implementation of authentication for a single connection | ||
* @override | ||
*/ | ||
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) { | ||
// TODO: Destructure this | ||
const username = credentials.username; | ||
const password = credentials.password; | ||
const mechanismProperties = credentials.mechanismProperties; | ||
const gssapiServiceName = | ||
mechanismProperties['gssapiservicename'] || | ||
mechanismProperties['gssapiServiceName'] || | ||
'mongodb'; | ||
/** | ||
* Authenticate | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {string} db Name of the database | ||
* @param {string} username Username | ||
* @param {string} password Password | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
SSPI.prototype.auth = function(server, connections, db, username, password, options, callback) { | ||
var self = this; | ||
let kerberos; | ||
try { | ||
kerberos = retrieveKerberos(); | ||
} catch (e) { | ||
return callback(e, null); | ||
SSIPAuthenticate( | ||
this, | ||
kerberos.processes.MongoAuthProcess, | ||
username, | ||
password, | ||
gssapiServiceName, | ||
sendAuthCommand, | ||
connection, | ||
mechanismProperties, | ||
callback | ||
); | ||
} | ||
var gssapiServiceName = options['gssapiServiceName'] || 'mongodb'; | ||
// Total connections | ||
var count = connections.length; | ||
if (count === 0) return callback(null, null); | ||
/** | ||
* Authenticate | ||
* @override | ||
* @method | ||
*/ | ||
auth(sendAuthCommand, connections, credentials, callback) { | ||
if (kerberos == null) { | ||
try { | ||
kerberos = retrieveKerberos(); | ||
} catch (e) { | ||
return callback(e, null); | ||
} | ||
} | ||
// Valid connections | ||
var numberOfValidConnections = 0; | ||
var errorObject = null; | ||
// For each connection we need to authenticate | ||
while (connections.length > 0) { | ||
// Execute MongoCR | ||
var execute = function(connection) { | ||
// Start Auth process for a connection | ||
SSIPAuthenticate( | ||
self, | ||
kerberos.processes.MongoAuthProcess, | ||
username, | ||
password, | ||
gssapiServiceName, | ||
server, | ||
connection, | ||
options, | ||
function(err, r) { | ||
// Adjust count | ||
count = count - 1; | ||
// If we have an error | ||
if (err) { | ||
errorObject = err; | ||
} else if (r && typeof r === 'object' && r.result['$err']) { | ||
errorObject = r.result; | ||
} else if (r && typeof r === 'object' && r.result['errmsg']) { | ||
errorObject = r.result; | ||
} else { | ||
numberOfValidConnections = numberOfValidConnections + 1; | ||
} | ||
// We have authenticated all connections | ||
if (count === 0 && numberOfValidConnections > 0) { | ||
// Store the auth details | ||
addAuthSession(self.authStore, new AuthSession(db, username, password, options)); | ||
// Return correct authentication | ||
callback(null, true); | ||
} else if (count === 0) { | ||
if (errorObject == null) | ||
errorObject = new MongoError(f('failed to authenticate using mongocr')); | ||
callback(errorObject, false); | ||
} | ||
} | ||
); | ||
}; | ||
var _execute = function(_connection) { | ||
process.nextTick(function() { | ||
execute(_connection); | ||
}); | ||
}; | ||
_execute(connections.shift()); | ||
super.auth(sendAuthCommand, connections, credentials, callback); | ||
} | ||
}; | ||
} | ||
@@ -122,3 +64,3 @@ function SSIPAuthenticate( | ||
gssapiServiceName, | ||
server, | ||
sendAuthCommand, | ||
connection, | ||
@@ -136,8 +78,3 @@ options, | ||
function authCommand(command, authCb) { | ||
const query = new Query(self.bson, '$external.$cmd', command, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}); | ||
server(connection, query, authCb); | ||
sendAuthCommand(connection, '$external.$cmd', command, authCb); | ||
} | ||
@@ -158,5 +95,4 @@ | ||
authCommand(command, (err, result) => { | ||
authCommand(command, (err, doc) => { | ||
if (err) return callback(err, false); | ||
const doc = result.result; | ||
@@ -171,5 +107,4 @@ authProcess.transition(doc.payload, (err, payload) => { | ||
authCommand(command, (err, result) => { | ||
authCommand(command, (err, doc) => { | ||
if (err) return callback(err, false); | ||
const doc = result.result; | ||
@@ -200,68 +135,2 @@ authProcess.transition(doc.payload, (err, payload) => { | ||
// Add to store only if it does not exist | ||
var addAuthSession = function(authStore, session) { | ||
var found = false; | ||
for (var i = 0; i < authStore.length; i++) { | ||
if (authStore[i].equal(session)) { | ||
found = true; | ||
break; | ||
} | ||
} | ||
if (!found) authStore.push(session); | ||
}; | ||
/** | ||
* Remove authStore credentials | ||
* @method | ||
* @param {string} db Name of database we are removing authStore details about | ||
* @return {object} | ||
*/ | ||
SSPI.prototype.logout = function(dbName) { | ||
this.authStore = this.authStore.filter(function(x) { | ||
return x.db !== dbName; | ||
}); | ||
}; | ||
/** | ||
* Re authenticate pool | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
SSPI.prototype.reauthenticate = function(server, connections, callback) { | ||
var authStore = this.authStore.slice(0); | ||
var count = authStore.length; | ||
if (count === 0) return callback(null, null); | ||
// Iterate over all the auth details stored | ||
for (var i = 0; i < authStore.length; i++) { | ||
this.auth( | ||
server, | ||
connections, | ||
authStore[i].db, | ||
authStore[i].username, | ||
authStore[i].password, | ||
authStore[i].options, | ||
function(err) { | ||
count = count - 1; | ||
// Done re-authenticating | ||
if (count === 0) { | ||
callback(err, null); | ||
} | ||
} | ||
); | ||
} | ||
}; | ||
/** | ||
* This is a result from a authentication strategy | ||
* | ||
* @callback authResultCallback | ||
* @param {error} error An error object. Set to null if no error present | ||
* @param {boolean} result The result of the authentication process | ||
*/ | ||
module.exports = SSPI; |
'use strict'; | ||
var f = require('util').format, | ||
Query = require('../connection/commands').Query, | ||
MongoError = require('../error').MongoError; | ||
const AuthProvider = require('./auth_provider').AuthProvider; | ||
var AuthSession = function(db, username, password) { | ||
this.db = db; | ||
this.username = username; | ||
this.password = password; | ||
}; | ||
AuthSession.prototype.equal = function(session) { | ||
return ( | ||
session.db === this.db && | ||
session.username === this.username && | ||
session.password === this.password | ||
); | ||
}; | ||
/** | ||
* Creates a new X509 authentication mechanism | ||
* @class | ||
* @return {X509} A cursor instance | ||
* @extends AuthProvider | ||
*/ | ||
var X509 = function(bson) { | ||
this.bson = bson; | ||
this.authStore = []; | ||
}; | ||
/** | ||
* Authenticate | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {string} db Name of the database | ||
* @param {string} username Username | ||
* @param {string} password Password | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
X509.prototype.auth = function(server, connections, db, username, password, callback) { | ||
var self = this; | ||
// Total connections | ||
var count = connections.length; | ||
if (count === 0) return callback(null, null); | ||
// Valid connections | ||
var numberOfValidConnections = 0; | ||
var errorObject = null; | ||
// For each connection we need to authenticate | ||
while (connections.length > 0) { | ||
// Execute MongoCR | ||
var execute = function(connection) { | ||
// Let's start the sasl process | ||
var command = { | ||
authenticate: 1, | ||
mechanism: 'MONGODB-X509' | ||
}; | ||
// Add username if specified | ||
if (username) { | ||
command.user = username; | ||
} | ||
// Let's start the process | ||
server( | ||
connection, | ||
new Query(self.bson, '$external.$cmd', command, { | ||
numberToSkip: 0, | ||
numberToReturn: 1 | ||
}), | ||
function(err, r) { | ||
// Adjust count | ||
count = count - 1; | ||
// If we have an error | ||
if (err) { | ||
errorObject = err; | ||
} else if (r.result['$err']) { | ||
errorObject = r.result; | ||
} else if (r.result['errmsg']) { | ||
errorObject = r.result; | ||
} else { | ||
numberOfValidConnections = numberOfValidConnections + 1; | ||
} | ||
// We have authenticated all connections | ||
if (count === 0 && numberOfValidConnections > 0) { | ||
// Store the auth details | ||
addAuthSession(self.authStore, new AuthSession(db, username, password)); | ||
// Return correct authentication | ||
callback(null, true); | ||
} else if (count === 0) { | ||
if (errorObject == null) | ||
errorObject = new MongoError(f('failed to authenticate using mongocr')); | ||
callback(errorObject, false); | ||
} | ||
} | ||
); | ||
}; | ||
var _execute = function(_connection) { | ||
process.nextTick(function() { | ||
execute(_connection); | ||
}); | ||
}; | ||
_execute(connections.shift()); | ||
} | ||
}; | ||
// Add to store only if it does not exist | ||
var addAuthSession = function(authStore, session) { | ||
var found = false; | ||
for (var i = 0; i < authStore.length; i++) { | ||
if (authStore[i].equal(session)) { | ||
found = true; | ||
break; | ||
class X509 extends AuthProvider { | ||
/** | ||
* Implementation of authentication for a single connection | ||
* @override | ||
*/ | ||
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) { | ||
const username = credentials.username; | ||
const command = { authenticate: 1, mechanism: 'MONGODB-X509' }; | ||
if (username) { | ||
command.user = username; | ||
} | ||
} | ||
if (!found) authStore.push(session); | ||
}; | ||
/** | ||
* Remove authStore credentials | ||
* @method | ||
* @param {string} db Name of database we are removing authStore details about | ||
* @return {object} | ||
*/ | ||
X509.prototype.logout = function(dbName) { | ||
this.authStore = this.authStore.filter(function(x) { | ||
return x.db !== dbName; | ||
}); | ||
}; | ||
/** | ||
* Re authenticate pool | ||
* @method | ||
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on | ||
* @param {[]Connections} connections Connections to authenticate using this authenticator | ||
* @param {authResultCallback} callback The callback to return the result from the authentication | ||
* @return {object} | ||
*/ | ||
X509.prototype.reauthenticate = function(server, connections, callback) { | ||
var authStore = this.authStore.slice(0); | ||
var count = authStore.length; | ||
if (count === 0) return callback(null, null); | ||
// Iterate over all the auth details stored | ||
for (var i = 0; i < authStore.length; i++) { | ||
this.auth( | ||
server, | ||
connections, | ||
authStore[i].db, | ||
authStore[i].username, | ||
authStore[i].password, | ||
function(err) { | ||
count = count - 1; | ||
// Done re-authenticating | ||
if (count === 0) { | ||
callback(err, null); | ||
} | ||
} | ||
); | ||
sendAuthCommand(connection, '$external.$cmd', command, callback); | ||
} | ||
}; | ||
} | ||
/** | ||
* This is a result from a authentication strategy | ||
* | ||
* @callback authResultCallback | ||
* @param {error} error An error object. Set to null if no error present | ||
* @param {boolean} result The result of the authentication process | ||
*/ | ||
module.exports = X509; |
'use strict'; | ||
const Msg = require('../connection/msg').Msg; | ||
const KillCursor = require('../connection/commands').KillCursor; | ||
@@ -20,3 +21,3 @@ const GetMore = require('../connection/commands').GetMore; | ||
// helper methods | ||
const extractCommandName = command => Object.keys(command)[0]; | ||
const extractCommandName = commandDoc => Object.keys(commandDoc)[0]; | ||
const namespace = command => command.ns; | ||
@@ -79,2 +80,6 @@ const databaseName = command => command.ns.split('.')[0]; | ||
if (command instanceof Msg) { | ||
return command.command; | ||
} | ||
if (command.query && command.query.$query) { | ||
@@ -147,2 +152,5 @@ let result; | ||
// in the event of a `noResponse` command, just return | ||
if (reply === null) return reply; | ||
return reply.result; | ||
@@ -149,0 +157,0 @@ }; |
@@ -6,3 +6,2 @@ 'use strict'; | ||
var Long = BSON.Long; | ||
const MongoError = require('../error').MongoError; | ||
const Buffer = require('safe-buffer').Buffer; | ||
@@ -468,49 +467,2 @@ | ||
// | ||
// Single document and documentsReturnedIn set | ||
// | ||
if (this.numberReturned === 1 && documentsReturnedIn != null && raw) { | ||
// Calculate the bson size | ||
bsonSize = | ||
this.data[this.index] | | ||
(this.data[this.index + 1] << 8) | | ||
(this.data[this.index + 2] << 16) | | ||
(this.data[this.index + 3] << 24); | ||
// Slice out the buffer containing the command result document | ||
var document = this.data.slice(this.index, this.index + bsonSize); | ||
// Set up field we wish to keep as raw | ||
var fieldsAsRaw = {}; | ||
fieldsAsRaw[documentsReturnedIn] = true; | ||
_options.fieldsAsRaw = fieldsAsRaw; | ||
// Deserialize but keep the array of documents in non-parsed form | ||
var doc = this.bson.deserialize(document, _options); | ||
if (doc instanceof Error) { | ||
throw doc; | ||
} | ||
if (doc.errmsg) { | ||
throw new MongoError(doc.errmsg); | ||
} | ||
if (!doc.cursor) { | ||
throw new MongoError('Cursor not found'); | ||
} | ||
// Get the documents | ||
this.documents = doc.cursor[documentsReturnedIn]; | ||
this.numberReturned = this.documents.length; | ||
// Ensure we have a Long valie cursor id | ||
this.cursorId = | ||
typeof doc.cursor.id === 'number' ? Long.fromNumber(doc.cursor.id) : doc.cursor.id; | ||
// Adjust the index | ||
this.index = this.index + bsonSize; | ||
// Set as parsed | ||
this.parsed = true; | ||
return; | ||
} | ||
// | ||
// Parse Body | ||
@@ -539,2 +491,11 @@ // | ||
if (this.documents.length === 1 && documentsReturnedIn != null && raw) { | ||
const fieldsAsRaw = {}; | ||
fieldsAsRaw[documentsReturnedIn] = true; | ||
_options.fieldsAsRaw = fieldsAsRaw; | ||
const doc = this.bson.deserialize(this.documents[0], _options); | ||
this.documents = [doc]; | ||
} | ||
// Set parsed | ||
@@ -541,0 +502,0 @@ this.parsed = true; |
'use strict'; | ||
var inherits = require('util').inherits, | ||
EventEmitter = require('events').EventEmitter, | ||
net = require('net'), | ||
tls = require('tls'), | ||
crypto = require('crypto'), | ||
f = require('util').format, | ||
debugOptions = require('./utils').debugOptions, | ||
parseHeader = require('../wireprotocol/shared').parseHeader, | ||
decompress = require('../wireprotocol/compression').decompress, | ||
Response = require('./commands').Response, | ||
MongoNetworkError = require('../error').MongoNetworkError, | ||
Logger = require('./logger'), | ||
OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED, | ||
MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE, | ||
Buffer = require('safe-buffer').Buffer; | ||
const EventEmitter = require('events').EventEmitter; | ||
const crypto = require('crypto'); | ||
const debugOptions = require('./utils').debugOptions; | ||
const parseHeader = require('../wireprotocol/shared').parseHeader; | ||
const decompress = require('../wireprotocol/compression').decompress; | ||
const Response = require('./commands').Response; | ||
const BinMsg = require('./msg').BinMsg; | ||
const MongoNetworkError = require('../error').MongoNetworkError; | ||
const MongoError = require('../error').MongoError; | ||
const Logger = require('./logger'); | ||
const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED; | ||
const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG; | ||
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE; | ||
const Buffer = require('safe-buffer').Buffer; | ||
var _id = 0; | ||
var debugFields = [ | ||
let _id = 0; | ||
const DEFAULT_MAX_BSON_MESSAGE_SIZE = 1024 * 1024 * 16 * 4; | ||
const DEBUG_FIELDS = [ | ||
'host', | ||
@@ -29,3 +30,2 @@ 'port', | ||
'socketTimeout', | ||
'singleBufferSerializtion', | ||
'ssl', | ||
@@ -42,29 +42,9 @@ 'ca', | ||
var connectionAccountingSpy = undefined; | ||
var connectionAccounting = false; | ||
var connections = {}; | ||
let connectionAccountingSpy = undefined; | ||
let connectionAccounting = false; | ||
let connections = {}; | ||
/** | ||
* Creates a new Connection instance | ||
* @class | ||
* @param {string} options.host The server host | ||
* @param {number} options.port The server port | ||
* @param {number} [options.family=null] IP version for DNS lookup, passed down to Node's [`dns.lookup()` function](https://nodejs.org/api/dns.html#dns_dns_lookup_hostname_options_callback). If set to `6`, will only look for ipv6 addresses. | ||
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled | ||
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled | ||
* @param {boolean} [options.noDelay=true] TCP Connection no delay | ||
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting | ||
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting | ||
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed | ||
* @param {boolean} [options.ssl=false] Use SSL for connection | ||
* @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. | ||
* @param {Buffer} [options.ca] SSL Certificate store binary buffer | ||
* @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer | ||
* @param {Buffer} [options.cert] SSL Certificate binary buffer | ||
* @param {Buffer} [options.key] SSL Key file binary buffer | ||
* @param {string} [options.passphrase] SSL Certificate pass phrase | ||
* @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates | ||
* @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits | ||
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types. | ||
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers. | ||
* A class representing a single connection to a MongoDB server | ||
* | ||
* @fires Connection#connect | ||
@@ -75,140 +55,227 @@ * @fires Connection#close | ||
* @fires Connection#parseError | ||
* @return {Connection} A cursor instance | ||
* @fires Connection#message | ||
*/ | ||
var Connection = function(messageHandler, options) { | ||
// Add event listener | ||
EventEmitter.call(this); | ||
// Set empty if no options passed | ||
this.options = options || {}; | ||
// Identification information | ||
this.id = _id++; | ||
// Logger instance | ||
this.logger = Logger('Connection', options); | ||
// No bson parser passed in | ||
if (!options.bson) throw new Error('must pass in valid bson parser'); | ||
// Get bson parser | ||
this.bson = options.bson; | ||
// Grouping tag used for debugging purposes | ||
this.tag = options.tag; | ||
// Message handler | ||
this.messageHandler = messageHandler; | ||
class Connection extends EventEmitter { | ||
/** | ||
* Creates a new Connection instance | ||
* | ||
* @param {Socket} socket The socket this connection wraps | ||
* @param {Object} [options] Optional settings | ||
* @param {string} [options.host] The host the socket is connected to | ||
* @param {number} [options.port] The port used for the socket connection | ||
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled | ||
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled | ||
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting | ||
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting | ||
* @param {boolean} [options.promoteLongs] Convert Long values from the db into Numbers if they fit into 53 bits | ||
* @param {boolean} [options.promoteValues] Promotes BSON values to native types where possible, set to false to only receive wrapper types. | ||
* @param {boolean} [options.promoteBuffers] Promotes Binary BSON values to native Node Buffers. | ||
*/ | ||
constructor(socket, options) { | ||
super(); | ||
// Max BSON message size | ||
this.maxBsonMessageSize = options.maxBsonMessageSize || 1024 * 1024 * 16 * 4; | ||
// Debug information | ||
if (this.logger.isDebug()) | ||
this.logger.debug( | ||
f( | ||
'creating connection %s with options [%s]', | ||
this.id, | ||
JSON.stringify(debugOptions(debugFields, options)) | ||
) | ||
); | ||
options = options || {}; | ||
if (!options.bson) { | ||
throw new TypeError('must pass in valid bson parser'); | ||
} | ||
// Default options | ||
this.port = options.port || 27017; | ||
this.host = options.host || 'localhost'; | ||
this.family = typeof options.family === 'number' ? options.family : void 0; | ||
this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true; | ||
this.keepAliveInitialDelay = | ||
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000; | ||
this.noDelay = typeof options.noDelay === 'boolean' ? options.noDelay : true; | ||
this.connectionTimeout = | ||
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000; | ||
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000; | ||
this.id = _id++; | ||
this.options = options; | ||
this.logger = Logger('Connection', options); | ||
this.bson = options.bson; | ||
this.tag = options.tag; | ||
this.maxBsonMessageSize = options.maxBsonMessageSize || DEFAULT_MAX_BSON_MESSAGE_SIZE; | ||
// Is the keepAliveInitialDelay > socketTimeout set it to half of socketTimeout | ||
if (this.keepAliveInitialDelay > this.socketTimeout) { | ||
this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2); | ||
this.port = options.port || 27017; | ||
this.host = options.host || 'localhost'; | ||
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000; | ||
// These values are inspected directly in tests, but maybe not necessary to keep around | ||
this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true; | ||
this.keepAliveInitialDelay = | ||
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000; | ||
this.connectionTimeout = | ||
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000; | ||
if (this.keepAliveInitialDelay > this.socketTimeout) { | ||
this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2); | ||
} | ||
// Debug information | ||
if (this.logger.isDebug()) { | ||
this.logger.debug( | ||
`creating connection ${this.id} with options [${JSON.stringify( | ||
debugOptions(DEBUG_FIELDS, options) | ||
)}]` | ||
); | ||
} | ||
// Response options | ||
this.responseOptions = { | ||
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, | ||
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, | ||
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false | ||
}; | ||
// Flushing | ||
this.flushing = false; | ||
this.queue = []; | ||
// Internal state | ||
this.writeStream = null; | ||
this.destroyed = false; | ||
// Create hash method | ||
const hash = crypto.createHash('sha1'); | ||
hash.update(this.address); | ||
this.hashedName = hash.digest('hex'); | ||
// All operations in flight on the connection | ||
this.workItems = []; | ||
// setup socket | ||
this.socket = socket; | ||
this.socket.once('error', errorHandler(this)); | ||
this.socket.once('timeout', timeoutHandler(this)); | ||
this.socket.once('close', closeHandler(this)); | ||
this.socket.on('data', dataHandler(this)); | ||
if (connectionAccounting) { | ||
addConnection(this.id, this); | ||
} | ||
} | ||
// If connection was destroyed | ||
this.destroyed = false; | ||
setSocketTimeout(value) { | ||
if (this.socket) { | ||
this.socket.setTimeout(value); | ||
} | ||
} | ||
// Check if we have a domain socket | ||
this.domainSocket = this.host.indexOf('/') !== -1; | ||
resetSocketTimeout() { | ||
if (this.socket) { | ||
this.socket.setTimeout(this.socketTimeout); | ||
} | ||
} | ||
// Serialize commands using function | ||
this.singleBufferSerializtion = | ||
typeof options.singleBufferSerializtion === 'boolean' ? options.singleBufferSerializtion : true; | ||
this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin'; | ||
static enableConnectionAccounting(spy) { | ||
if (spy) { | ||
connectionAccountingSpy = spy; | ||
} | ||
// SSL options | ||
this.ca = options.ca || null; | ||
this.crl = options.crl || null; | ||
this.cert = options.cert || null; | ||
this.key = options.key || null; | ||
this.passphrase = options.passphrase || null; | ||
this.ciphers = options.ciphers || null; | ||
this.ecdhCurve = options.ecdhCurve || null; | ||
this.ssl = typeof options.ssl === 'boolean' ? options.ssl : false; | ||
this.rejectUnauthorized = | ||
typeof options.rejectUnauthorized === 'boolean' ? options.rejectUnauthorized : true; | ||
this.checkServerIdentity = | ||
typeof options.checkServerIdentity === 'boolean' || | ||
typeof options.checkServerIdentity === 'function' | ||
? options.checkServerIdentity | ||
: true; | ||
connectionAccounting = true; | ||
connections = {}; | ||
} | ||
// If ssl not enabled | ||
if (!this.ssl) this.rejectUnauthorized = false; | ||
static disableConnectionAccounting() { | ||
connectionAccounting = false; | ||
connectionAccountingSpy = undefined; | ||
} | ||
// Response options | ||
this.responseOptions = { | ||
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, | ||
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, | ||
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false | ||
}; | ||
static connections() { | ||
return connections; | ||
} | ||
// Flushing | ||
this.flushing = false; | ||
this.queue = []; | ||
get address() { | ||
return `${this.host}:${this.port}`; | ||
} | ||
// Internal state | ||
this.connection = null; | ||
this.writeStream = null; | ||
/** | ||
* Unref this connection | ||
* @method | ||
* @return {boolean} | ||
*/ | ||
unref() { | ||
if (this.socket == null) { | ||
this.once('connect', () => this.socket.unref()); | ||
return; | ||
} | ||
// Create hash method | ||
var hash = crypto.createHash('sha1'); | ||
hash.update(f('%s:%s', this.host, this.port)); | ||
this.socket.unref(); | ||
} | ||
// Create a hash name | ||
this.hashedName = hash.digest('hex'); | ||
/** | ||
* Destroy connection | ||
* @method | ||
*/ | ||
destroy() { | ||
if (connectionAccounting) { | ||
deleteConnection(this.id); | ||
} | ||
// All operations in flight on the connection | ||
this.workItems = []; | ||
}; | ||
if (this.socket) { | ||
// Catch posssible exception thrown by node 0.10.x | ||
try { | ||
this.socket.end(); | ||
} catch (err) {} // eslint-disable-line | ||
inherits(Connection, EventEmitter); | ||
this.socket.destroy(); | ||
} | ||
Connection.prototype.setSocketTimeout = function(value) { | ||
if (this.connection) { | ||
this.connection.setTimeout(value); | ||
this.destroyed = true; | ||
} | ||
}; | ||
Connection.prototype.resetSocketTimeout = function() { | ||
if (this.connection) { | ||
this.connection.setTimeout(this.socketTimeout); | ||
/** | ||
* Write to connection | ||
* @method | ||
* @param {Command} command Command to write out need to implement toBin and toBinUnified | ||
*/ | ||
write(buffer) { | ||
// Debug Log | ||
if (this.logger.isDebug()) { | ||
if (!Array.isArray(buffer)) { | ||
this.logger.debug(`writing buffer [${buffer.toString('hex')}] to ${this.address}`); | ||
} else { | ||
for (let i = 0; i < buffer.length; i++) | ||
this.logger.debug(`writing buffer [${buffer[i].toString('hex')}] to ${this.address}`); | ||
} | ||
} | ||
// Double check that the connection is not destroyed | ||
if (this.socket.destroyed === false) { | ||
// Write out the command | ||
if (!Array.isArray(buffer)) { | ||
this.socket.write(buffer, 'binary'); | ||
return true; | ||
} | ||
// Iterate over all buffers and write them in order to the socket | ||
for (let i = 0; i < buffer.length; i++) { | ||
this.socket.write(buffer[i], 'binary'); | ||
} | ||
return true; | ||
} | ||
// Connection is destroyed return write failed | ||
return false; | ||
} | ||
}; | ||
Connection.enableConnectionAccounting = function(spy) { | ||
if (spy) { | ||
connectionAccountingSpy = spy; | ||
/** | ||
* Return id of connection as a string | ||
* @method | ||
* @return {string} | ||
*/ | ||
toString() { | ||
return '' + this.id; | ||
} | ||
connectionAccounting = true; | ||
connections = {}; | ||
}; | ||
/** | ||
* Return json object of connection | ||
* @method | ||
* @return {object} | ||
*/ | ||
toJSON() { | ||
return { id: this.id, host: this.host, port: this.port }; | ||
} | ||
Connection.disableConnectionAccounting = function() { | ||
connectionAccounting = false; | ||
connectionAccountingSpy = undefined; | ||
}; | ||
/** | ||
* Is the connection connected | ||
* @method | ||
* @return {boolean} | ||
*/ | ||
isConnected() { | ||
if (this.destroyed) return false; | ||
return !this.socket.destroyed && this.socket.writable; | ||
} | ||
} | ||
Connection.connections = function() { | ||
return connections; | ||
}; | ||
function deleteConnection(id) { | ||
@@ -234,95 +301,112 @@ // console.log("=== deleted connection " + id + " :: " + (connections[id] ? connections[id].port : '')) | ||
// Connection handlers | ||
var errorHandler = function(self) { | ||
function errorHandler(conn) { | ||
return function(err) { | ||
if (connectionAccounting) deleteConnection(self.id); | ||
if (connectionAccounting) deleteConnection(conn.id); | ||
// Debug information | ||
if (self.logger.isDebug()) | ||
self.logger.debug( | ||
f( | ||
'connection %s for [%s:%s] errored out with [%s]', | ||
self.id, | ||
self.host, | ||
self.port, | ||
JSON.stringify(err) | ||
) | ||
if (conn.logger.isDebug()) { | ||
conn.logger.debug( | ||
`connection ${conn.id} for [${conn.address}] errored out with [${JSON.stringify(err)}]` | ||
); | ||
// Emit the error | ||
if (self.listeners('error').length > 0) self.emit('error', new MongoNetworkError(err), self); | ||
} | ||
conn.emit('error', new MongoNetworkError(err), conn); | ||
}; | ||
}; | ||
} | ||
var timeoutHandler = function(self) { | ||
function timeoutHandler(conn) { | ||
return function() { | ||
if (connectionAccounting) deleteConnection(self.id); | ||
// Debug information | ||
if (self.logger.isDebug()) | ||
self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port)); | ||
// Emit timeout error | ||
self.emit( | ||
if (connectionAccounting) deleteConnection(conn.id); | ||
if (conn.logger.isDebug()) { | ||
conn.logger.debug(`connection ${conn.id} for [${conn.address}] timed out`); | ||
} | ||
conn.emit( | ||
'timeout', | ||
new MongoNetworkError(f('connection %s to %s:%s timed out', self.id, self.host, self.port)), | ||
self | ||
new MongoNetworkError(`connection ${conn.id} to ${conn.address} timed out`), | ||
conn | ||
); | ||
}; | ||
}; | ||
} | ||
var closeHandler = function(self) { | ||
function closeHandler(conn) { | ||
return function(hadError) { | ||
if (connectionAccounting) deleteConnection(self.id); | ||
// Debug information | ||
if (self.logger.isDebug()) | ||
self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port)); | ||
if (connectionAccounting) deleteConnection(conn.id); | ||
// Emit close event | ||
if (conn.logger.isDebug()) { | ||
conn.logger.debug(`connection ${conn.id} with for [${conn.address}] closed`); | ||
} | ||
if (!hadError) { | ||
self.emit( | ||
conn.emit( | ||
'close', | ||
new MongoNetworkError(f('connection %s to %s:%s closed', self.id, self.host, self.port)), | ||
self | ||
new MongoNetworkError(`connection ${conn.id} to ${conn.address} closed`), | ||
conn | ||
); | ||
} | ||
}; | ||
}; | ||
} | ||
// Handle a message once it is received | ||
var emitMessageHandler = function(self, message) { | ||
var msgHeader = parseHeader(message); | ||
if (msgHeader.opCode === OP_COMPRESSED) { | ||
msgHeader.fromCompressed = true; | ||
var index = MESSAGE_HEADER_SIZE; | ||
msgHeader.opCode = message.readInt32LE(index); | ||
index += 4; | ||
msgHeader.length = message.readInt32LE(index); | ||
index += 4; | ||
var compressorID = message[index]; | ||
index++; | ||
decompress(compressorID, message.slice(index), function(err, decompressedMsgBody) { | ||
if (err) { | ||
throw err; | ||
} | ||
if (decompressedMsgBody.length !== msgHeader.length) { | ||
throw new Error( | ||
'Decompressing a compressed message from the server failed. The message is corrupt.' | ||
); | ||
} | ||
self.messageHandler( | ||
new Response(self.bson, message, msgHeader, decompressedMsgBody, self.responseOptions), | ||
self | ||
); | ||
}); | ||
} else { | ||
self.messageHandler( | ||
new Response( | ||
self.bson, | ||
function processMessage(conn, message) { | ||
const msgHeader = parseHeader(message); | ||
if (msgHeader.opCode !== OP_COMPRESSED) { | ||
const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response; | ||
conn.emit( | ||
'message', | ||
new ResponseConstructor( | ||
conn.bson, | ||
message, | ||
msgHeader, | ||
message.slice(MESSAGE_HEADER_SIZE), | ||
self.responseOptions | ||
conn.responseOptions | ||
), | ||
self | ||
conn | ||
); | ||
return; | ||
} | ||
}; | ||
var dataHandler = function(self) { | ||
msgHeader.fromCompressed = true; | ||
let index = MESSAGE_HEADER_SIZE; | ||
msgHeader.opCode = message.readInt32LE(index); | ||
index += 4; | ||
msgHeader.length = message.readInt32LE(index); | ||
index += 4; | ||
const compressorID = message[index]; | ||
index++; | ||
decompress(compressorID, message.slice(index), (err, decompressedMsgBody) => { | ||
if (err) { | ||
conn.emit('error', err); | ||
return; | ||
} | ||
if (decompressedMsgBody.length !== msgHeader.length) { | ||
conn.emit( | ||
'error', | ||
new MongoError( | ||
'Decompressing a compressed message from the server failed. The message is corrupt.' | ||
) | ||
); | ||
return; | ||
} | ||
const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response; | ||
conn.emit( | ||
'message', | ||
new ResponseConstructor( | ||
conn.bson, | ||
message, | ||
msgHeader, | ||
decompressedMsgBody, | ||
conn.responseOptions | ||
), | ||
conn | ||
); | ||
}); | ||
} | ||
function dataHandler(conn) { | ||
return function(data) { | ||
@@ -332,11 +416,11 @@ // Parse until we are done with the data | ||
// If we still have bytes to read on the current message | ||
if (self.bytesRead > 0 && self.sizeOfMessage > 0) { | ||
if (conn.bytesRead > 0 && conn.sizeOfMessage > 0) { | ||
// Calculate the amount of remaining bytes | ||
var remainingBytesToRead = self.sizeOfMessage - self.bytesRead; | ||
const remainingBytesToRead = conn.sizeOfMessage - conn.bytesRead; | ||
// Check if the current chunk contains the rest of the message | ||
if (remainingBytesToRead > data.length) { | ||
// Copy the new data into the exiting buffer (should have been allocated when we know the message size) | ||
data.copy(self.buffer, self.bytesRead); | ||
data.copy(conn.buffer, conn.bytesRead); | ||
// Adjust the number of bytes read so it point to the correct index in the buffer | ||
self.bytesRead = self.bytesRead + data.length; | ||
conn.bytesRead = conn.bytesRead + data.length; | ||
@@ -347,3 +431,3 @@ // Reset state of buffer | ||
// Copy the missing part of the data into our current buffer | ||
data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead); | ||
data.copy(conn.buffer, conn.bytesRead, 0, remainingBytesToRead); | ||
// Slice the overflow into a new buffer that we will then re-parse | ||
@@ -353,25 +437,10 @@ data = data.slice(remainingBytesToRead); | ||
// Emit current complete message | ||
try { | ||
var emitBuffer = self.buffer; | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
const emitBuffer = conn.buffer; | ||
// Reset state of buffer | ||
conn.buffer = null; | ||
conn.sizeOfMessage = 0; | ||
conn.bytesRead = 0; | ||
conn.stubBuffer = null; | ||
emitMessageHandler(self, emitBuffer); | ||
} catch (err) { | ||
var errorObject = { | ||
err: 'socketHandler', | ||
trace: err, | ||
bin: self.buffer, | ||
parseState: { | ||
sizeOfMessage: self.sizeOfMessage, | ||
bytesRead: self.bytesRead, | ||
stubBuffer: self.stubBuffer | ||
} | ||
}; | ||
// We got a parse Error fire it off then keep going | ||
self.emit('parseError', errorObject, self); | ||
} | ||
processMessage(conn, emitBuffer); | ||
} | ||
@@ -381,9 +450,9 @@ } else { | ||
// size of the message (< 4 bytes) | ||
if (self.stubBuffer != null && self.stubBuffer.length > 0) { | ||
if (conn.stubBuffer != null && conn.stubBuffer.length > 0) { | ||
// If we have enough bytes to determine the message size let's do it | ||
if (self.stubBuffer.length + data.length > 4) { | ||
if (conn.stubBuffer.length + data.length > 4) { | ||
// Prepad the data | ||
var newData = Buffer.alloc(self.stubBuffer.length + data.length); | ||
self.stubBuffer.copy(newData, 0); | ||
data.copy(newData, self.stubBuffer.length); | ||
const newData = Buffer.alloc(conn.stubBuffer.length + data.length); | ||
conn.stubBuffer.copy(newData, 0); | ||
data.copy(newData, conn.stubBuffer.length); | ||
// Reassign for parsing | ||
@@ -393,13 +462,13 @@ data = newData; | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
conn.buffer = null; | ||
conn.sizeOfMessage = 0; | ||
conn.bytesRead = 0; | ||
conn.stubBuffer = null; | ||
} else { | ||
// Add the the bytes to the stub buffer | ||
var newStubBuffer = Buffer.alloc(self.stubBuffer.length + data.length); | ||
const newStubBuffer = Buffer.alloc(conn.stubBuffer.length + data.length); | ||
// Copy existing stub buffer | ||
self.stubBuffer.copy(newStubBuffer, 0); | ||
conn.stubBuffer.copy(newStubBuffer, 0); | ||
// Copy missing part of the data | ||
data.copy(newStubBuffer, self.stubBuffer.length); | ||
data.copy(newStubBuffer, conn.stubBuffer.length); | ||
// Exit parsing loop | ||
@@ -411,18 +480,17 @@ data = Buffer.alloc(0); | ||
// Retrieve the message size | ||
// var sizeOfMessage = data.readUInt32LE(0); | ||
var sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24); | ||
const sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24); | ||
// If we have a negative sizeOfMessage emit error and return | ||
if (sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) { | ||
errorObject = { | ||
if (sizeOfMessage < 0 || sizeOfMessage > conn.maxBsonMessageSize) { | ||
const errorObject = { | ||
err: 'socketHandler', | ||
trace: '', | ||
bin: self.buffer, | ||
bin: conn.buffer, | ||
parseState: { | ||
sizeOfMessage: sizeOfMessage, | ||
bytesRead: self.bytesRead, | ||
stubBuffer: self.stubBuffer | ||
bytesRead: conn.bytesRead, | ||
stubBuffer: conn.stubBuffer | ||
} | ||
}; | ||
// We got a parse Error fire it off then keep going | ||
self.emit('parseError', errorObject, self); | ||
conn.emit('parseError', errorObject, conn); | ||
return; | ||
@@ -434,14 +502,14 @@ } | ||
sizeOfMessage > 4 && | ||
sizeOfMessage < self.maxBsonMessageSize && | ||
sizeOfMessage < conn.maxBsonMessageSize && | ||
sizeOfMessage > data.length | ||
) { | ||
self.buffer = Buffer.alloc(sizeOfMessage); | ||
conn.buffer = Buffer.alloc(sizeOfMessage); | ||
// Copy all the data into the buffer | ||
data.copy(self.buffer, 0); | ||
data.copy(conn.buffer, 0); | ||
// Update bytes read | ||
self.bytesRead = data.length; | ||
conn.bytesRead = data.length; | ||
// Update sizeOfMessage | ||
self.sizeOfMessage = sizeOfMessage; | ||
conn.sizeOfMessage = sizeOfMessage; | ||
// Ensure stub buffer is null | ||
self.stubBuffer = null; | ||
conn.stubBuffer = null; | ||
// Exit parsing loop | ||
@@ -451,21 +519,17 @@ data = Buffer.alloc(0); | ||
sizeOfMessage > 4 && | ||
sizeOfMessage < self.maxBsonMessageSize && | ||
sizeOfMessage < conn.maxBsonMessageSize && | ||
sizeOfMessage === data.length | ||
) { | ||
try { | ||
emitBuffer = data; | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
// Exit parsing loop | ||
data = Buffer.alloc(0); | ||
// Emit the message | ||
emitMessageHandler(self, emitBuffer); | ||
} catch (err) { | ||
self.emit('parseError', err, self); | ||
} | ||
} else if (sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) { | ||
errorObject = { | ||
const emitBuffer = data; | ||
// Reset state of buffer | ||
conn.buffer = null; | ||
conn.sizeOfMessage = 0; | ||
conn.bytesRead = 0; | ||
conn.stubBuffer = null; | ||
// Exit parsing loop | ||
data = Buffer.alloc(0); | ||
// Emit the message | ||
processMessage(conn, emitBuffer); | ||
} else if (sizeOfMessage <= 4 || sizeOfMessage > conn.maxBsonMessageSize) { | ||
const errorObject = { | ||
err: 'socketHandler', | ||
@@ -482,28 +546,28 @@ trace: null, | ||
// We got a parse Error fire it off then keep going | ||
self.emit('parseError', errorObject, self); | ||
conn.emit('parseError', errorObject, conn); | ||
// Clear out the state of the parser | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
conn.buffer = null; | ||
conn.sizeOfMessage = 0; | ||
conn.bytesRead = 0; | ||
conn.stubBuffer = null; | ||
// Exit parsing loop | ||
data = Buffer.alloc(0); | ||
} else { | ||
emitBuffer = data.slice(0, sizeOfMessage); | ||
const emitBuffer = data.slice(0, sizeOfMessage); | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
conn.buffer = null; | ||
conn.sizeOfMessage = 0; | ||
conn.bytesRead = 0; | ||
conn.stubBuffer = null; | ||
// Copy rest of message | ||
data = data.slice(sizeOfMessage); | ||
// Emit the message | ||
emitMessageHandler(self, emitBuffer); | ||
processMessage(conn, emitBuffer); | ||
} | ||
} else { | ||
// Create a buffer that contains the space for the non-complete message | ||
self.stubBuffer = Buffer.alloc(data.length); | ||
conn.stubBuffer = Buffer.alloc(data.length); | ||
// Copy the data to the stub buffer | ||
data.copy(self.stubBuffer, 0); | ||
data.copy(conn.stubBuffer, 0); | ||
// Exit parsing loop | ||
@@ -516,270 +580,5 @@ data = Buffer.alloc(0); | ||
}; | ||
}; | ||
// List of socket level valid ssl options | ||
var legalSslSocketOptions = [ | ||
'pfx', | ||
'key', | ||
'passphrase', | ||
'cert', | ||
'ca', | ||
'ciphers', | ||
'NPNProtocols', | ||
'ALPNProtocols', | ||
'servername', | ||
'ecdhCurve', | ||
'secureProtocol', | ||
'secureContext', | ||
'session', | ||
'minDHSize' | ||
]; | ||
function merge(options1, options2) { | ||
// Merge in any allowed ssl options | ||
for (var name in options2) { | ||
if (options2[name] != null && legalSslSocketOptions.indexOf(name) !== -1) { | ||
options1[name] = options2[name]; | ||
} | ||
} | ||
} | ||
function makeSSLConnection(self, _options) { | ||
let sslOptions = { | ||
socket: self.connection, | ||
rejectUnauthorized: self.rejectUnauthorized | ||
}; | ||
// Merge in options | ||
merge(sslOptions, self.options); | ||
merge(sslOptions, _options); | ||
// Set options for ssl | ||
if (self.ca) sslOptions.ca = self.ca; | ||
if (self.crl) sslOptions.crl = self.crl; | ||
if (self.cert) sslOptions.cert = self.cert; | ||
if (self.key) sslOptions.key = self.key; | ||
if (self.passphrase) sslOptions.passphrase = self.passphrase; | ||
// Override checkServerIdentity behavior | ||
if (self.checkServerIdentity === false) { | ||
// Skip the identiy check by retuning undefined as per node documents | ||
// https://nodejs.org/api/tls.html#tls_tls_connect_options_callback | ||
sslOptions.checkServerIdentity = function() { | ||
return undefined; | ||
}; | ||
} else if (typeof self.checkServerIdentity === 'function') { | ||
sslOptions.checkServerIdentity = self.checkServerIdentity; | ||
} | ||
// Set default sni servername to be the same as host | ||
if (sslOptions.servername == null) { | ||
sslOptions.servername = self.host; | ||
} | ||
// Attempt SSL connection | ||
const connection = tls.connect(self.port, self.host, sslOptions, function() { | ||
// Error on auth or skip | ||
if (connection.authorizationError && self.rejectUnauthorized) { | ||
return self.emit('error', connection.authorizationError, self, { ssl: true }); | ||
} | ||
// Set socket timeout instead of connection timeout | ||
connection.setTimeout(self.socketTimeout); | ||
// We are done emit connect | ||
self.emit('connect', self); | ||
}); | ||
// Set the options for the connection | ||
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay); | ||
connection.setTimeout(self.connectionTimeout); | ||
connection.setNoDelay(self.noDelay); | ||
return connection; | ||
} | ||
function makeUnsecureConnection(self, family) { | ||
// Create new connection instance | ||
let connection_options; | ||
if (self.domainSocket) { | ||
connection_options = { path: self.host }; | ||
} else { | ||
connection_options = { port: self.port, host: self.host }; | ||
connection_options.family = family; | ||
} | ||
const connection = net.createConnection(connection_options); | ||
// Set the options for the connection | ||
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay); | ||
connection.setTimeout(self.connectionTimeout); | ||
connection.setNoDelay(self.noDelay); | ||
connection.once('connect', function() { | ||
// Set socket timeout instead of connection timeout | ||
connection.setTimeout(self.socketTimeout); | ||
// Emit connect event | ||
self.emit('connect', self); | ||
}); | ||
return connection; | ||
} | ||
function doConnect(self, family, _options, _errorHandler) { | ||
self.connection = self.ssl | ||
? makeSSLConnection(self, _options) | ||
: makeUnsecureConnection(self, family); | ||
// Add handlers for events | ||
self.connection.once('error', _errorHandler); | ||
self.connection.once('timeout', timeoutHandler(self)); | ||
self.connection.once('close', closeHandler(self)); | ||
self.connection.on('data', dataHandler(self)); | ||
} | ||
/** | ||
* Connect | ||
* @method | ||
*/ | ||
Connection.prototype.connect = function(_options) { | ||
_options = _options || {}; | ||
// Set the connections | ||
if (connectionAccounting) addConnection(this.id, this); | ||
// Check if we are overriding the promoteLongs | ||
if (typeof _options.promoteLongs === 'boolean') { | ||
this.responseOptions.promoteLongs = _options.promoteLongs; | ||
this.responseOptions.promoteValues = _options.promoteValues; | ||
this.responseOptions.promoteBuffers = _options.promoteBuffers; | ||
} | ||
const _errorHandler = errorHandler(this); | ||
if (this.family !== void 0) { | ||
return doConnect(this, this.family, _options, _errorHandler); | ||
} | ||
return doConnect(this, 6, _options, err => { | ||
if (this.logger.isDebug()) { | ||
this.logger.debug( | ||
f( | ||
'connection %s for [%s:%s] errored out with [%s]', | ||
this.id, | ||
this.host, | ||
this.port, | ||
JSON.stringify(err) | ||
) | ||
); | ||
} | ||
// clean up existing event handlers | ||
this.connection.removeAllListeners('error'); | ||
this.connection.removeAllListeners('timeout'); | ||
this.connection.removeAllListeners('close'); | ||
this.connection.removeAllListeners('data'); | ||
this.connection = undefined; | ||
return doConnect(this, 4, _options, _errorHandler); | ||
}); | ||
}; | ||
/** | ||
* Unref this connection | ||
* @method | ||
* @return {boolean} | ||
*/ | ||
Connection.prototype.unref = function() { | ||
if (this.connection) this.connection.unref(); | ||
else { | ||
var self = this; | ||
this.once('connect', function() { | ||
self.connection.unref(); | ||
}); | ||
} | ||
}; | ||
/** | ||
* Destroy connection | ||
* @method | ||
*/ | ||
Connection.prototype.destroy = function() { | ||
// Set the connections | ||
if (connectionAccounting) deleteConnection(this.id); | ||
if (this.connection) { | ||
// Catch posssible exception thrown by node 0.10.x | ||
try { | ||
this.connection.end(); | ||
} catch (err) {} // eslint-disable-line | ||
// Destroy connection | ||
this.connection.destroy(); | ||
} | ||
this.destroyed = true; | ||
}; | ||
/** | ||
* Write to connection | ||
* @method | ||
* @param {Command} command Command to write out need to implement toBin and toBinUnified | ||
*/ | ||
Connection.prototype.write = function(buffer) { | ||
var i; | ||
// Debug Log | ||
if (this.logger.isDebug()) { | ||
if (!Array.isArray(buffer)) { | ||
this.logger.debug( | ||
f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port) | ||
); | ||
} else { | ||
for (i = 0; i < buffer.length; i++) | ||
this.logger.debug( | ||
f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port) | ||
); | ||
} | ||
} | ||
// Double check that the connection is not destroyed | ||
if (this.connection.destroyed === false) { | ||
// Write out the command | ||
if (!Array.isArray(buffer)) { | ||
this.connection.write(buffer, 'binary'); | ||
return true; | ||
} | ||
// Iterate over all buffers and write them in order to the socket | ||
for (i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary'); | ||
return true; | ||
} | ||
// Connection is destroyed return write failed | ||
return false; | ||
}; | ||
/** | ||
* Return id of connection as a string | ||
* @method | ||
* @return {string} | ||
*/ | ||
Connection.prototype.toString = function() { | ||
return '' + this.id; | ||
}; | ||
/** | ||
* Return json object of connection | ||
* @method | ||
* @return {object} | ||
*/ | ||
Connection.prototype.toJSON = function() { | ||
return { id: this.id, host: this.host, port: this.port }; | ||
}; | ||
/** | ||
* Is the connection connected | ||
* @method | ||
* @return {boolean} | ||
*/ | ||
Connection.prototype.isConnected = function() { | ||
if (this.destroyed) return false; | ||
return !this.connection.destroyed && this.connection.writable; | ||
}; | ||
/** | ||
* A server connect event, used to verify that the connection is up and running | ||
@@ -819,2 +618,9 @@ * | ||
/** | ||
* An event emitted each time the connection receives a parsed message from the wire | ||
* | ||
* @event Connection#message | ||
* @type {Connection} | ||
*/ | ||
module.exports = Connection; |
@@ -5,3 +5,2 @@ 'use strict'; | ||
const EventEmitter = require('events').EventEmitter; | ||
const Connection = require('./connection'); | ||
const MongoError = require('../error').MongoError; | ||
@@ -13,4 +12,6 @@ const MongoNetworkError = require('../error').MongoNetworkError; | ||
const Query = require('./commands').Query; | ||
const Msg = require('./msg').Msg; | ||
const CommandResult = require('./command_result'); | ||
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE; | ||
const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE; | ||
const opcodes = require('../wireprotocol/shared').opcodes; | ||
@@ -22,4 +23,4 @@ const compress = require('../wireprotocol/compression').compress; | ||
const apm = require('./apm'); | ||
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders; | ||
const Buffer = require('safe-buffer').Buffer; | ||
const connect = require('./connect'); | ||
@@ -36,2 +37,6 @@ var DISCONNECTED = 'disconnected'; | ||
if (topology == null) return false; | ||
if (topology.description) { | ||
return topology.description.maxWireVersion >= 6; | ||
} | ||
return topology.ismaster == null ? false : topology.ismaster.maxWireVersion >= 6; | ||
@@ -142,3 +147,3 @@ } | ||
this.inUseConnections = []; | ||
this.connectingConnections = []; | ||
this.connectingConnections = 0; | ||
// Currently executing | ||
@@ -149,13 +154,5 @@ this.executing = false; | ||
// All the authProviders | ||
this.authProviders = options.authProviders || defaultAuthProviders(options.bson); | ||
// Contains the reconnect connection | ||
this.reconnectConnection = null; | ||
// Are we currently authenticating | ||
this.authenticating = false; | ||
this.loggingout = false; | ||
this.nonAuthenticatedConnections = []; | ||
this.authenticatingTimestamp = null; | ||
// Number of consecutive timeouts caught | ||
@@ -165,2 +162,25 @@ this.numberOfConsecutiveTimeouts = 0; | ||
this.connectionIndex = 0; | ||
// event handlers | ||
const pool = this; | ||
this._messageHandler = messageHandler(this); | ||
this._connectionCloseHandler = function(err) { | ||
const connection = this; | ||
connectionFailureHandler(pool, 'close', err, connection); | ||
}; | ||
this._connectionErrorHandler = function(err) { | ||
const connection = this; | ||
connectionFailureHandler(pool, 'error', err, connection); | ||
}; | ||
this._connectionTimeoutHandler = function(err) { | ||
const connection = this; | ||
connectionFailureHandler(pool, 'timeout', err, connection); | ||
}; | ||
this._connectionParseErrorHandler = function(err) { | ||
const connection = this; | ||
connectionFailureHandler(pool, 'parseError', err, connection); | ||
}; | ||
}; | ||
@@ -225,119 +245,55 @@ | ||
function authenticate(pool, auth, connection, cb) { | ||
if (auth[0] === undefined) return cb(null); | ||
// We need to authenticate the server | ||
var mechanism = auth[0]; | ||
var db = auth[1]; | ||
// Validate if the mechanism exists | ||
if (!pool.authProviders[mechanism]) { | ||
throw new MongoError(f('authMechanism %s not supported', mechanism)); | ||
} | ||
function connectionFailureHandler(pool, event, err, conn) { | ||
if (conn) { | ||
if (conn._connectionFailHandled) return; | ||
conn._connectionFailHandled = true; | ||
conn.destroy(); | ||
// Get the provider | ||
var provider = pool.authProviders[mechanism]; | ||
// Authenticate using the provided mechanism | ||
provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb])); | ||
} | ||
// The write function used by the authentication mechanism (bypasses external) | ||
function write(self) { | ||
return function(connection, command, callback) { | ||
// Get the raw buffer | ||
// Ensure we stop auth if pool was destroyed | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
return callback(new MongoError('pool destroyed')); | ||
} | ||
// Set the connection workItem callback | ||
connection.workItems.push({ | ||
cb: callback, | ||
command: true, | ||
requestId: command.requestId | ||
}); | ||
// Write the buffer out to the connection | ||
connection.write(command.toBin()); | ||
}; | ||
} | ||
function reauthenticate(pool, connection, cb) { | ||
// Authenticate | ||
function authenticateAgainstProvider(pool, connection, providers, cb) { | ||
// Finished re-authenticating against providers | ||
if (providers.length === 0) return cb(); | ||
// Get the provider name | ||
var provider = pool.authProviders[providers.pop()]; | ||
// Auth provider | ||
provider.reauthenticate(write(pool), [connection], function(err) { | ||
// We got an error return immediately | ||
if (err) return cb(err); | ||
// Continue authenticating the connection | ||
authenticateAgainstProvider(pool, connection, providers, cb); | ||
}); | ||
} | ||
// Start re-authenticating process | ||
authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb); | ||
} | ||
function connectionFailureHandler(self, event) { | ||
return function(err) { | ||
if (this._connectionFailHandled) return; | ||
this._connectionFailHandled = true; | ||
// Destroy the connection | ||
this.destroy(); | ||
// Remove the connection | ||
removeConnection(self, this); | ||
removeConnection(pool, conn); | ||
// Flush all work Items on this connection | ||
while (this.workItems.length > 0) { | ||
var workItem = this.workItems.shift(); | ||
while (conn.workItems.length > 0) { | ||
const workItem = conn.workItems.shift(); | ||
if (workItem.cb) workItem.cb(err); | ||
} | ||
} | ||
// Did we catch a timeout, increment the numberOfConsecutiveTimeouts | ||
if (event === 'timeout') { | ||
self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1; | ||
// Did we catch a timeout, increment the numberOfConsecutiveTimeouts | ||
if (event === 'timeout') { | ||
pool.numberOfConsecutiveTimeouts = pool.numberOfConsecutiveTimeouts + 1; | ||
// Have we timed out more than reconnectTries in a row ? | ||
// Force close the pool as we are trying to connect to tcp sink hole | ||
if (self.numberOfConsecutiveTimeouts > self.options.reconnectTries) { | ||
self.numberOfConsecutiveTimeouts = 0; | ||
// Destroy all connections and pool | ||
self.destroy(true); | ||
// Emit close event | ||
return self.emit('close', self); | ||
} | ||
// Have we timed out more than reconnectTries in a row ? | ||
// Force close the pool as we are trying to connect to tcp sink hole | ||
if (pool.numberOfConsecutiveTimeouts > pool.options.reconnectTries) { | ||
pool.numberOfConsecutiveTimeouts = 0; | ||
// Destroy all connections and pool | ||
pool.destroy(true); | ||
// Emit close event | ||
return pool.emit('close', pool); | ||
} | ||
} | ||
// No more socket available propegate the event | ||
if (self.socketCount() === 0) { | ||
if (self.state !== DESTROYED && self.state !== DESTROYING) { | ||
stateTransition(self, DISCONNECTED); | ||
} | ||
// Do not emit error events, they are always close events | ||
// do not trigger the low level error handler in node | ||
event = event === 'error' ? 'close' : event; | ||
self.emit(event, err); | ||
// No more socket available propegate the event | ||
if (pool.socketCount() === 0) { | ||
if (pool.state !== DESTROYED && pool.state !== DESTROYING) { | ||
stateTransition(pool, DISCONNECTED); | ||
} | ||
// Start reconnection attempts | ||
if (!self.reconnectId && self.options.reconnect) { | ||
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval); | ||
} | ||
// Do not emit error events, they are always close events | ||
// do not trigger the low level error handler in node | ||
event = event === 'error' ? 'close' : event; | ||
pool.emit(event, err); | ||
} | ||
// Do we need to do anything to maintain the minimum pool size | ||
const totalConnections = | ||
self.availableConnections.length + | ||
self.connectingConnections.length + | ||
self.inUseConnections.length; | ||
// Start reconnection attempts | ||
if (!pool.reconnectId && pool.options.reconnect) { | ||
pool.reconnectId = setTimeout(attemptReconnect(pool), pool.options.reconnectInterval); | ||
} | ||
if (totalConnections < self.minSize) { | ||
_createConnection(self); | ||
} | ||
}; | ||
// Do we need to do anything to maintain the minimum pool size | ||
const totalConnections = totalConnectionCount(pool); | ||
if (totalConnections < pool.minSize) { | ||
_createConnection(pool); | ||
} | ||
} | ||
@@ -356,16 +312,14 @@ | ||
// If we have failure schedule a retry | ||
function _connectionFailureHandler(self) { | ||
return function() { | ||
if (this._connectionFailHandled) return; | ||
this._connectionFailHandled = true; | ||
// Destroy the connection | ||
this.destroy(); | ||
// Count down the number of reconnects | ||
self.connectingConnections++; | ||
connect(self.options, (err, connection) => { | ||
self.connectingConnections--; | ||
if (err) { | ||
if (self.logger.isDebug()) { | ||
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`); | ||
} | ||
self.retriesLeft = self.retriesLeft - 1; | ||
// How many retries are left | ||
if (self.retriesLeft <= 0) { | ||
// Destroy the instance | ||
self.destroy(); | ||
// Emit close event | ||
self.emit( | ||
@@ -384,57 +338,24 @@ 'reconnectFailed', | ||
} | ||
}; | ||
} | ||
// Got a connect handler | ||
function _connectHandler(self) { | ||
return function() { | ||
// Assign | ||
var connection = this; | ||
return; | ||
} | ||
// Pool destroyed stop the connection | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
return connection.destroy(); | ||
} | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
return connection.destroy(); | ||
} | ||
// Clear out all handlers | ||
handlers.forEach(function(event) { | ||
connection.removeAllListeners(event); | ||
}); | ||
self.reconnectId = null; | ||
handlers.forEach(event => connection.removeAllListeners(event)); | ||
connection.on('error', self._connectionErrorHandler); | ||
connection.on('close', self._connectionCloseHandler); | ||
connection.on('timeout', self._connectionTimeoutHandler); | ||
connection.on('parseError', self._connectionParseErrorHandler); | ||
connection.on('message', self._messageHandler); | ||
// Reset reconnect id | ||
self.reconnectId = null; | ||
// Apply pool connection handlers | ||
connection.on('error', connectionFailureHandler(self, 'error')); | ||
connection.on('close', connectionFailureHandler(self, 'close')); | ||
connection.on('timeout', connectionFailureHandler(self, 'timeout')); | ||
connection.on('parseError', connectionFailureHandler(self, 'parseError')); | ||
// Apply any auth to the connection | ||
reauthenticate(self, this, function() { | ||
// Reset retries | ||
self.retriesLeft = self.options.reconnectTries; | ||
// Push to available connections | ||
self.availableConnections.push(connection); | ||
// Set the reconnectConnection to null | ||
self.reconnectConnection = null; | ||
// Emit reconnect event | ||
self.emit('reconnect', self); | ||
// Trigger execute to start everything up again | ||
_execute(self)(); | ||
}); | ||
}; | ||
} | ||
// Create a connection | ||
self.reconnectConnection = new Connection(messageHandler(self), self.options); | ||
// Add handlers | ||
self.reconnectConnection.on('close', _connectionFailureHandler(self, 'close')); | ||
self.reconnectConnection.on('error', _connectionFailureHandler(self, 'error')); | ||
self.reconnectConnection.on('timeout', _connectionFailureHandler(self, 'timeout')); | ||
self.reconnectConnection.on('parseError', _connectionFailureHandler(self, 'parseError')); | ||
// On connection | ||
self.reconnectConnection.on('connect', _connectHandler(self)); | ||
// Attempt connection | ||
self.reconnectConnection.connect(); | ||
self.retriesLeft = self.options.reconnectTries; | ||
self.availableConnections.push(connection); | ||
self.reconnectConnection = null; | ||
self.emit('reconnect', self); | ||
_execute(self)(); | ||
}); | ||
}; | ||
@@ -467,2 +388,6 @@ } | ||
if (workItem && workItem.monitoring) { | ||
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections); | ||
} | ||
// Reset timeout counter | ||
@@ -489,52 +414,2 @@ self.numberOfConsecutiveTimeouts = 0; | ||
// Authenticate any straggler connections | ||
function authenticateStragglers(self, connection, callback) { | ||
// Get any non authenticated connections | ||
var connections = self.nonAuthenticatedConnections.slice(0); | ||
var nonAuthenticatedConnections = self.nonAuthenticatedConnections; | ||
self.nonAuthenticatedConnections = []; | ||
// Establish if the connection need to be authenticated | ||
// Add to authentication list if | ||
// 1. we were in an authentication process when the operation was executed | ||
// 2. our current authentication timestamp is from the workItem one, meaning an auth has happened | ||
if ( | ||
connection.workItems.length === 1 && | ||
(connection.workItems[0].authenticating === true || | ||
(typeof connection.workItems[0].authenticatingTimestamp === 'number' && | ||
connection.workItems[0].authenticatingTimestamp !== self.authenticatingTimestamp)) | ||
) { | ||
// Add connection to the list | ||
connections.push(connection); | ||
} | ||
// No connections need to be re-authenticated | ||
if (connections.length === 0) { | ||
// Release the connection back to the pool | ||
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections); | ||
// Finish | ||
return callback(); | ||
} | ||
// Apply re-authentication to all connections before releasing back to pool | ||
var connectionCount = connections.length; | ||
// Authenticate all connections | ||
for (var i = 0; i < connectionCount; i++) { | ||
reauthenticate(self, connections[i], function() { | ||
connectionCount = connectionCount - 1; | ||
if (connectionCount === 0) { | ||
// Put non authenticated connections in available connections | ||
self.availableConnections = self.availableConnections.concat( | ||
nonAuthenticatedConnections | ||
); | ||
// Release the connection back to the pool | ||
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections); | ||
// Return | ||
callback(); | ||
} | ||
}); | ||
} | ||
} | ||
function handleOperationCallback(self, cb, err, result) { | ||
@@ -552,71 +427,67 @@ // No domain enabled | ||
authenticateStragglers(self, connection, function() { | ||
// Keep executing, ensure current message handler does not stop execution | ||
if (!self.executing) { | ||
process.nextTick(function() { | ||
_execute(self)(); | ||
}); | ||
// Keep executing, ensure current message handler does not stop execution | ||
if (!self.executing) { | ||
process.nextTick(function() { | ||
_execute(self)(); | ||
}); | ||
} | ||
// Time to dispatch the message if we have a callback | ||
if (workItem && !workItem.immediateRelease) { | ||
try { | ||
// Parse the message according to the provided options | ||
message.parse(workItem); | ||
} catch (err) { | ||
return handleOperationCallback(self, workItem.cb, new MongoError(err)); | ||
} | ||
// Time to dispatch the message if we have a callback | ||
if (workItem && !workItem.immediateRelease) { | ||
try { | ||
// Parse the message according to the provided options | ||
message.parse(workItem); | ||
} catch (err) { | ||
return handleOperationCallback(self, workItem.cb, new MongoError(err)); | ||
} | ||
// Look for clusterTime, operationTime, and recoveryToken and update them if necessary | ||
if (message.documents[0]) { | ||
const session = workItem.session; | ||
const document = message.documents[0]; | ||
if (document.$clusterTime) { | ||
const $clusterTime = document.$clusterTime; | ||
self.topology.clusterTime = $clusterTime; | ||
// Look for clusterTime, and operationTime and update them if necessary | ||
if (message.documents[0]) { | ||
if (message.documents[0].$clusterTime) { | ||
const $clusterTime = message.documents[0].$clusterTime; | ||
self.topology.clusterTime = $clusterTime; | ||
if (workItem.session != null) { | ||
resolveClusterTime(workItem.session, $clusterTime); | ||
} | ||
if (session != null) { | ||
resolveClusterTime(session, $clusterTime); | ||
} | ||
} | ||
if ( | ||
message.documents[0].operationTime && | ||
workItem.session && | ||
workItem.session.supports.causalConsistency | ||
) { | ||
workItem.session.advanceOperationTime(message.documents[0].operationTime); | ||
} | ||
if (document.operationTime && session && session.supports.causalConsistency) { | ||
session.advanceOperationTime(message.documents[0].operationTime); | ||
} | ||
// Establish if we have an error | ||
if (workItem.command && message.documents[0]) { | ||
const responseDoc = message.documents[0]; | ||
if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) { | ||
return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc)); | ||
} | ||
if (document.recoveryToken && session && session.inTransaction()) { | ||
session.transaction._recoveryToken = document.recoveryToken; | ||
} | ||
} | ||
if (responseDoc.writeConcernError) { | ||
const err = | ||
responseDoc.ok === 1 | ||
? new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc) | ||
: new MongoWriteConcernError(responseDoc.writeConcernError); | ||
return handleOperationCallback(self, workItem.cb, err); | ||
} | ||
// Establish if we have an error | ||
if (workItem.command && message.documents[0]) { | ||
const responseDoc = message.documents[0]; | ||
if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) { | ||
return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc)); | ||
} | ||
// Add the connection details | ||
message.hashedName = connection.hashedName; | ||
if (responseDoc.writeConcernError) { | ||
const err = | ||
responseDoc.ok === 1 | ||
? new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc) | ||
: new MongoWriteConcernError(responseDoc.writeConcernError); | ||
return handleOperationCallback(self, workItem.cb, err); | ||
} | ||
} | ||
// Return the documents | ||
handleOperationCallback( | ||
self, | ||
workItem.cb, | ||
null, | ||
new CommandResult( | ||
workItem.fullResult ? message : message.documents[0], | ||
connection, | ||
message | ||
) | ||
); | ||
} | ||
}); | ||
// Add the connection details | ||
message.hashedName = connection.hashedName; | ||
// Return the documents | ||
handleOperationCallback( | ||
self, | ||
workItem.cb, | ||
null, | ||
new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message) | ||
); | ||
} | ||
}; | ||
@@ -635,2 +506,8 @@ } | ||
function totalConnectionCount(pool) { | ||
return ( | ||
pool.availableConnections.length + pool.inUseConnections.length + pool.connectingConnections | ||
); | ||
} | ||
/** | ||
@@ -642,3 +519,3 @@ * Return all pool connections | ||
Pool.prototype.allConnections = function() { | ||
return this.availableConnections.concat(this.inUseConnections).concat(this.connectingConnections); | ||
return this.availableConnections.concat(this.inUseConnections); | ||
}; | ||
@@ -674,7 +551,2 @@ | ||
// Might be authenticating, but we are still connected | ||
if (connections.length === 0 && this.authenticating) { | ||
return true; | ||
} | ||
// Not connected | ||
@@ -704,3 +576,2 @@ return false; | ||
* Connect pool | ||
* @method | ||
*/ | ||
@@ -712,15 +583,32 @@ Pool.prototype.connect = function() { | ||
var self = this; | ||
// Transition to connecting state | ||
const self = this; | ||
stateTransition(this, CONNECTING); | ||
// Create an array of the arguments | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
// Create a connection | ||
var connection = new Connection(messageHandler(self), this.options); | ||
// Add to list of connections | ||
this.connectingConnections.push(connection); | ||
// Add listeners to the connection | ||
connection.once('connect', function(connection) { | ||
if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy(); | ||
self.connectingConnections++; | ||
connect(self.options, (err, connection) => { | ||
self.connectingConnections--; | ||
if (err) { | ||
if (self.logger.isDebug()) { | ||
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`); | ||
} | ||
if (self.state === CONNECTING) { | ||
self.emit('error', err); | ||
} | ||
return; | ||
} | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
return self.destroy(); | ||
} | ||
// attach event handlers | ||
connection.on('error', self._connectionErrorHandler); | ||
connection.on('close', self._connectionCloseHandler); | ||
connection.on('timeout', self._connectionTimeoutHandler); | ||
connection.on('parseError', self._connectionParseErrorHandler); | ||
connection.on('message', self._messageHandler); | ||
// If we are in a topology, delegate the auth to it | ||
@@ -730,66 +618,27 @@ // This is to avoid issues where we would auth against an | ||
if (self.options.inTopology) { | ||
// Set connected mode | ||
stateTransition(self, CONNECTED); | ||
self.availableConnections.push(connection); | ||
return self.emit('connect', self, connection); | ||
} | ||
// Move the active connection | ||
moveConnectionBetween(connection, self.connectingConnections, self.availableConnections); | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
return self.destroy(); | ||
} | ||
// Emit the connect event | ||
return self.emit('connect', self); | ||
if (err) { | ||
self.destroy(); | ||
return self.emit('error', err); | ||
} | ||
// Apply any store credentials | ||
reauthenticate(self, connection, function(err) { | ||
if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy(); | ||
stateTransition(self, CONNECTED); | ||
self.availableConnections.push(connection); | ||
// We have an error emit it | ||
if (err) { | ||
// Destroy the pool | ||
self.destroy(); | ||
// Emit the error | ||
return self.emit('error', err); | ||
if (self.minSize) { | ||
for (let i = 0; i < self.minSize; i++) { | ||
_createConnection(self); | ||
} | ||
} | ||
// Authenticate | ||
authenticate(self, args, connection, function(err) { | ||
if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy(); | ||
// We have an error emit it | ||
if (err) { | ||
// Destroy the pool | ||
self.destroy(); | ||
// Emit the error | ||
return self.emit('error', err); | ||
} | ||
// Set connected mode | ||
stateTransition(self, CONNECTED); | ||
// Move the active connection | ||
moveConnectionBetween(connection, self.connectingConnections, self.availableConnections); | ||
// if we have a minPoolSize, create a connection | ||
if (self.minSize) { | ||
for (let i = 0; i < self.minSize; i++) _createConnection(self); | ||
} | ||
// Emit the connect event | ||
self.emit('connect', self); | ||
}); | ||
}); | ||
self.emit('connect', self, connection); | ||
}); | ||
// Add error handlers | ||
connection.once('error', connectionFailureHandler(this, 'error')); | ||
connection.once('close', connectionFailureHandler(this, 'close')); | ||
connection.once('timeout', connectionFailureHandler(this, 'timeout')); | ||
connection.once('parseError', connectionFailureHandler(this, 'parseError')); | ||
try { | ||
connection.connect(); | ||
} catch (err) { | ||
// SSL or something threw on connect | ||
process.nextTick(function() { | ||
self.emit('error', err); | ||
}); | ||
} | ||
}; | ||
@@ -799,94 +648,6 @@ | ||
* Authenticate using a specified mechanism | ||
* @method | ||
* @param {string} mechanism The Auth mechanism we are invoking | ||
* @param {string} db The db we are invoking the mechanism against | ||
* @param {...object} param Parameters for the specific mechanism | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Pool.prototype.auth = function(mechanism) { | ||
var self = this; | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
var callback = args.pop(); | ||
// If we don't have the mechanism fail | ||
if (self.authProviders[mechanism] == null && mechanism !== 'default') { | ||
throw new MongoError(f('auth provider %s does not exist', mechanism)); | ||
} | ||
// Signal that we are authenticating a new set of credentials | ||
this.authenticating = true; | ||
this.authenticatingTimestamp = new Date().getTime(); | ||
// Authenticate all live connections | ||
function authenticateLiveConnections(self, args, cb) { | ||
// Get the current viable connections | ||
var connections = self.allConnections(); | ||
// Allow nothing else to use the connections while we authenticate them | ||
self.availableConnections = []; | ||
self.inUseConnections = []; | ||
self.connectingConnections = []; | ||
var connectionsCount = connections.length; | ||
var error = null; | ||
// No connections available, return | ||
if (connectionsCount === 0) { | ||
self.authenticating = false; | ||
return callback(null); | ||
} | ||
// Authenticate the connections | ||
for (var i = 0; i < connections.length; i++) { | ||
authenticate(self, args, connections[i], function(err, result) { | ||
connectionsCount = connectionsCount - 1; | ||
// Store the error | ||
if (err) error = err; | ||
// Processed all connections | ||
if (connectionsCount === 0) { | ||
// Auth finished | ||
self.authenticating = false; | ||
// Add the connections back to available connections | ||
self.availableConnections = self.availableConnections.concat(connections); | ||
// We had an error, return it | ||
if (error) { | ||
// Log the error | ||
if (self.logger.isError()) { | ||
self.logger.error( | ||
f( | ||
'[%s] failed to authenticate against server %s:%s', | ||
self.id, | ||
self.options.host, | ||
self.options.port | ||
) | ||
); | ||
} | ||
return cb(error, result); | ||
} | ||
cb(null, result); | ||
} | ||
}); | ||
} | ||
} | ||
// Wait for a logout in process to happen | ||
function waitForLogout(self, cb) { | ||
if (!self.loggingout) return cb(); | ||
setTimeout(function() { | ||
waitForLogout(self, cb); | ||
}, 1); | ||
} | ||
// Wait for loggout to finish | ||
waitForLogout(self, function() { | ||
// Authenticate all live connections | ||
authenticateLiveConnections(self, args, function(err, result) { | ||
// Credentials correctly stored in auth provider if successful | ||
// Any new connections will now reauthenticate correctly | ||
self.authenticating = false; | ||
// Return after authentication connections | ||
callback(err, result); | ||
}); | ||
}); | ||
Pool.prototype.auth = function(credentials, callback) { | ||
callback(null, null); | ||
}; | ||
@@ -896,46 +657,6 @@ | ||
* Logout all users against a database | ||
* @method | ||
* @param {string} dbName The database name | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Pool.prototype.logout = function(dbName, callback) { | ||
var self = this; | ||
if (typeof dbName !== 'string') { | ||
throw new MongoError('logout method requires a db name as first argument'); | ||
} | ||
if (typeof callback !== 'function') { | ||
throw new MongoError('logout method requires a callback'); | ||
} | ||
// Indicate logout in process | ||
this.loggingout = true; | ||
// Get all relevant connections | ||
var connections = self.availableConnections.concat(self.inUseConnections); | ||
var count = connections.length; | ||
// Store any error | ||
var error = null; | ||
// Send logout command over all the connections | ||
for (var i = 0; i < connections.length; i++) { | ||
write(self)( | ||
connections[i], | ||
new Query( | ||
this.options.bson, | ||
f('%s.$cmd', dbName), | ||
{ logout: 1 }, | ||
{ numberToSkip: 0, numberToReturn: 1 } | ||
), | ||
function(err) { | ||
count = count - 1; | ||
if (err) error = err; | ||
if (count === 0) { | ||
self.loggingout = false; | ||
callback(error); | ||
} | ||
} | ||
); | ||
} | ||
callback(null, null); | ||
}; | ||
@@ -949,5 +670,4 @@ | ||
// Get all the known connections | ||
var connections = this.availableConnections | ||
.concat(this.inUseConnections) | ||
.concat(this.connectingConnections); | ||
var connections = this.availableConnections.concat(this.inUseConnections); | ||
connections.forEach(function(c) { | ||
@@ -959,3 +679,3 @@ c.unref(); | ||
// Events | ||
var events = ['error', 'close', 'timeout', 'parseError', 'connect']; | ||
var events = ['error', 'close', 'timeout', 'parseError', 'connect', 'message']; | ||
@@ -977,4 +697,3 @@ // Destroy the connections | ||
self.availableConnections = []; | ||
self.nonAuthenticatedConnections = []; | ||
self.connectingConnections = []; | ||
self.connectingConnections = 0; | ||
@@ -999,6 +718,3 @@ // Set state to destroyed | ||
// Get all the known connections | ||
var connections = self.availableConnections | ||
.concat(self.inUseConnections) | ||
.concat(self.nonAuthenticatedConnections) | ||
.concat(self.connectingConnections); | ||
var connections = self.availableConnections.concat(self.inUseConnections); | ||
@@ -1035,6 +751,3 @@ // Flush any remaining work items with | ||
// Get all the known connections | ||
var connections = self.availableConnections | ||
.concat(self.inUseConnections) | ||
.concat(self.nonAuthenticatedConnections) | ||
.concat(self.connectingConnections); | ||
var connections = self.availableConnections.concat(self.inUseConnections); | ||
@@ -1064,52 +777,64 @@ // Check if we have any in flight operations | ||
/** | ||
* Reset all connections of this pool | ||
* | ||
* @param {function} [callback] | ||
*/ | ||
Pool.prototype.reset = function(callback) { | ||
// this.destroy(true, err => { | ||
// if (err && typeof callback === 'function') { | ||
// callback(err, null); | ||
// return; | ||
// } | ||
// stateTransition(this, DISCONNECTED); | ||
// this.connect(); | ||
// if (typeof callback === 'function') callback(null, null); | ||
// }); | ||
if (typeof callback === 'function') callback(); | ||
}; | ||
// Prepare the buffer that Pool.prototype.write() uses to send to the server | ||
var serializeCommands = function(self, commands, result, callback) { | ||
// Base case when there are no more commands to serialize | ||
if (commands.length === 0) return callback(null, result); | ||
function serializeCommand(self, command, callback) { | ||
const originalCommandBuffer = command.toBin(); | ||
// Pop off the zeroth command and serialize it | ||
var thisCommand = commands.shift(); | ||
var originalCommandBuffer = thisCommand.toBin(); | ||
// Check whether we and the server have agreed to use a compressor | ||
if (self.options.agreedCompressor && !hasUncompressibleCommands(thisCommand)) { | ||
// Transform originalCommandBuffer into OP_COMPRESSED | ||
var concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer); | ||
var messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); | ||
const shouldCompress = !!self.options.agreedCompressor; | ||
if (!shouldCompress || !canCompress(command)) { | ||
return callback(null, originalCommandBuffer); | ||
} | ||
// Extract information needed for OP_COMPRESSED from the uncompressed message | ||
var originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); | ||
// Transform originalCommandBuffer into OP_COMPRESSED | ||
const concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer); | ||
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); | ||
// Compress the message body | ||
compress(self, messageToBeCompressed, function(err, compressedMessage) { | ||
if (err) return callback(err, null); | ||
// Extract information needed for OP_COMPRESSED from the uncompressed message | ||
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); | ||
// Create the msgHeader of OP_COMPRESSED | ||
var msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); | ||
msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + 9 + compressedMessage.length, 0); // messageLength | ||
msgHeader.writeInt32LE(thisCommand.requestId, 4); // requestID | ||
msgHeader.writeInt32LE(0, 8); // responseTo (zero) | ||
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode | ||
// Compress the message body | ||
compress(self, messageToBeCompressed, function(err, compressedMessage) { | ||
if (err) return callback(err, null); | ||
// Create the compression details of OP_COMPRESSED | ||
var compressionDetails = Buffer.alloc(9); | ||
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode | ||
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader | ||
compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID | ||
// Create the msgHeader of OP_COMPRESSED | ||
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); | ||
msgHeader.writeInt32LE( | ||
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, | ||
0 | ||
); // messageLength | ||
msgHeader.writeInt32LE(command.requestId, 4); // requestID | ||
msgHeader.writeInt32LE(0, 8); // responseTo (zero) | ||
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode | ||
// Push the concatenation of the OP_COMPRESSED message onto results | ||
result.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); | ||
// Create the compression details of OP_COMPRESSED | ||
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); | ||
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode | ||
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader | ||
compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID | ||
// Continue recursing through the commands array | ||
serializeCommands(self, commands, result, callback); | ||
}); | ||
} else { | ||
// Push the serialization of the command onto results | ||
result.push(originalCommandBuffer); | ||
return callback(null, [msgHeader, compressionDetails, compressedMessage]); | ||
}); | ||
} | ||
// Continue recursing through the commands array | ||
serializeCommands(self, commands, result, callback); | ||
} | ||
}; | ||
/** | ||
@@ -1120,3 +845,3 @@ * Write a message to MongoDB | ||
*/ | ||
Pool.prototype.write = function(commands, options, cb) { | ||
Pool.prototype.write = function(command, options, cb) { | ||
var self = this; | ||
@@ -1201,9 +926,4 @@ // Ensure we have a callback | ||
// Ensure commands is an array | ||
if (!Array.isArray(commands)) { | ||
commands = [commands]; | ||
} | ||
// Get the requestId | ||
operation.requestId = commands[commands.length - 1].requestId; | ||
operation.requestId = command.requestId; | ||
@@ -1244,13 +964,14 @@ if (hasSessionSupport(this.topology)) { | ||
// decorate the commands with session-specific details | ||
commands.forEach(command => { | ||
if (command instanceof Query) { | ||
if (command.query.$query) { | ||
Object.assign(command.query.$query, sessionOptions); | ||
} else { | ||
Object.assign(command.query, sessionOptions); | ||
} | ||
} else { | ||
Object.assign(command, sessionOptions); | ||
} | ||
}); | ||
let commandDocument = command; | ||
if (command instanceof Query) { | ||
commandDocument = command.query; | ||
} else if (command instanceof Msg) { | ||
commandDocument = command.command; | ||
} | ||
if (commandDocument.$query) { | ||
commandDocument = commandDocument.$query; | ||
} | ||
Object.assign(commandDocument, sessionOptions); | ||
} | ||
@@ -1260,5 +981,2 @@ | ||
if (self.options.monitorCommands) { | ||
// NOTE: there is only ever a single command, for some legacy reason I am unaware of we | ||
// treat this as a potential array of commands | ||
const command = commands[0]; | ||
this.emit('commandStarted', new apm.CommandStartedEvent(this, command)); | ||
@@ -1292,7 +1010,7 @@ | ||
// Prepare the operation buffer | ||
serializeCommands(self, commands, [], function(err, serializedCommands) { | ||
serializeCommand(self, command, (err, serializedBuffers) => { | ||
if (err) throw err; | ||
// Set the operation's buffer to the serialization of the commands | ||
operation.buffer = serializedCommands; | ||
operation.buffer = serializedBuffers; | ||
@@ -1318,7 +1036,7 @@ // If we have a monitoring operation schedule as the very first operation | ||
// Will return true if command contains no uncompressible command terms | ||
var hasUncompressibleCommands = function(command) { | ||
return uncompressibleCommands.some(function(cmd) { | ||
return command.query.hasOwnProperty(cmd); | ||
}); | ||
}; | ||
function canCompress(command) { | ||
const commandDoc = command instanceof Msg ? command.command : command.query; | ||
const commandName = Object.keys(commandDoc)[0]; | ||
return uncompressibleCommands.indexOf(commandName) === -1; | ||
} | ||
@@ -1338,9 +1056,5 @@ // Remove connection method | ||
if (remove(connection, self.inUseConnections)) return; | ||
if (remove(connection, self.connectingConnections)) return; | ||
if (remove(connection, self.nonAuthenticatedConnections)) return; | ||
} | ||
// All event handlers | ||
var handlers = ['close', 'message', 'error', 'timeout', 'parseError', 'connect']; | ||
const handlers = ['close', 'message', 'error', 'timeout', 'parseError', 'connect']; | ||
function _createConnection(self) { | ||
@@ -1350,79 +1064,47 @@ if (self.state === DESTROYED || self.state === DESTROYING) { | ||
} | ||
var connection = new Connection(messageHandler(self), self.options); | ||
// Push the connection | ||
self.connectingConnections.push(connection); | ||
self.connectingConnections++; | ||
connect(self.options, (err, connection) => { | ||
self.connectingConnections--; | ||
// Handle any errors | ||
var tempErrorHandler = function(_connection) { | ||
return function() { | ||
// Destroy the connection | ||
_connection.destroy(); | ||
// Remove the connection from the connectingConnections list | ||
removeConnection(self, _connection); | ||
// Start reconnection attempts | ||
if (err) { | ||
if (self.logger.isDebug()) { | ||
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`); | ||
} | ||
if (!self.reconnectId && self.options.reconnect) { | ||
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval); | ||
} | ||
}; | ||
}; | ||
// Handle successful connection | ||
var tempConnectHandler = function(_connection) { | ||
return function() { | ||
// Destroyed state return | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
// Remove the connection from the list | ||
removeConnection(self, _connection); | ||
return _connection.destroy(); | ||
} | ||
return; | ||
} | ||
// Destroy all event emitters | ||
handlers.forEach(function(e) { | ||
_connection.removeAllListeners(e); | ||
}); | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
removeConnection(self, connection); | ||
return connection.destroy(); | ||
} | ||
// Add the final handlers | ||
_connection.once('close', connectionFailureHandler(self, 'close')); | ||
_connection.once('error', connectionFailureHandler(self, 'error')); | ||
_connection.once('timeout', connectionFailureHandler(self, 'timeout')); | ||
_connection.once('parseError', connectionFailureHandler(self, 'parseError')); | ||
connection.on('error', self._connectionErrorHandler); | ||
connection.on('close', self._connectionCloseHandler); | ||
connection.on('timeout', self._connectionTimeoutHandler); | ||
connection.on('parseError', self._connectionParseErrorHandler); | ||
connection.on('message', self._messageHandler); | ||
// Signal | ||
reauthenticate(self, _connection, function(err) { | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
return _connection.destroy(); | ||
} | ||
// Remove the connection from the connectingConnections list | ||
removeConnection(self, _connection); | ||
if (self.state === DESTROYED || self.state === DESTROYING) { | ||
return connection.destroy(); | ||
} | ||
// Handle error | ||
if (err) { | ||
return _connection.destroy(); | ||
} | ||
// Remove the connection from the connectingConnections list | ||
removeConnection(self, connection); | ||
// If we are c at the moment | ||
// Do not automatially put in available connections | ||
// As we need to apply the credentials first | ||
if (self.authenticating) { | ||
self.nonAuthenticatedConnections.push(_connection); | ||
} else { | ||
// Push to available | ||
self.availableConnections.push(_connection); | ||
// Execute any work waiting | ||
_execute(self)(); | ||
} | ||
}); | ||
}; | ||
}; | ||
// Handle error | ||
if (err) { | ||
return connection.destroy(); | ||
} | ||
// Add all handlers | ||
connection.once('close', tempErrorHandler(connection)); | ||
connection.once('error', tempErrorHandler(connection)); | ||
connection.once('timeout', tempErrorHandler(connection)); | ||
connection.once('parseError', tempErrorHandler(connection)); | ||
connection.once('connect', tempConnectHandler(connection)); | ||
// Start connection | ||
connection.connect(); | ||
// Push to available | ||
self.availableConnections.push(connection); | ||
// Execute any work waiting | ||
_execute(self)(); | ||
}); | ||
} | ||
@@ -1450,181 +1132,153 @@ | ||
// Wait for auth to clear before continuing | ||
function waitForAuth(cb) { | ||
if (!self.authenticating) return cb(); | ||
// Wait for a milisecond and try again | ||
setTimeout(function() { | ||
waitForAuth(cb); | ||
}, 1); | ||
// New pool connections are in progress, wait them to finish | ||
// before executing any more operation to ensure distribution of | ||
// operations | ||
if (self.connectingConnections > 0) { | ||
self.executing = false; | ||
return; | ||
} | ||
// Block on any auth in process | ||
waitForAuth(function() { | ||
// New pool connections are in progress, wait them to finish | ||
// before executing any more operation to ensure distribution of | ||
// operations | ||
if (self.connectingConnections.length > 0) { | ||
return; | ||
// As long as we have available connections | ||
// eslint-disable-next-line | ||
while (true) { | ||
// Total availble connections | ||
const totalConnections = totalConnectionCount(self); | ||
// No available connections available, flush any monitoring ops | ||
if (self.availableConnections.length === 0) { | ||
// Flush any monitoring operations | ||
flushMonitoringOperations(self.queue); | ||
break; | ||
} | ||
// As long as we have available connections | ||
// eslint-disable-next-line | ||
while (true) { | ||
// Total availble connections | ||
var totalConnections = | ||
self.availableConnections.length + | ||
self.connectingConnections.length + | ||
self.inUseConnections.length; | ||
// No queue break | ||
if (self.queue.length === 0) { | ||
break; | ||
} | ||
// No available connections available, flush any monitoring ops | ||
if (self.availableConnections.length === 0) { | ||
// Flush any monitoring operations | ||
flushMonitoringOperations(self.queue); | ||
break; | ||
} | ||
var connection = null; | ||
const connections = self.availableConnections.filter(conn => conn.workItems.length === 0); | ||
// No queue break | ||
if (self.queue.length === 0) { | ||
break; | ||
} | ||
// No connection found that has no work on it, just pick one for pipelining | ||
if (connections.length === 0) { | ||
connection = | ||
self.availableConnections[self.connectionIndex++ % self.availableConnections.length]; | ||
} else { | ||
connection = connections[self.connectionIndex++ % connections.length]; | ||
} | ||
// Get a connection | ||
var connection = null; | ||
// Is the connection connected | ||
if (!connection.isConnected()) { | ||
// Remove the disconnected connection | ||
removeConnection(self, connection); | ||
// Flush any monitoring operations in the queue, failing fast | ||
flushMonitoringOperations(self.queue); | ||
break; | ||
} | ||
// Locate all connections that have no work | ||
var connections = []; | ||
// Get a list of all connections | ||
for (var i = 0; i < self.availableConnections.length; i++) { | ||
if (self.availableConnections[i].workItems.length === 0) { | ||
connections.push(self.availableConnections[i]); | ||
// Get the next work item | ||
var workItem = self.queue.shift(); | ||
// If we are monitoring we need to use a connection that is not | ||
// running another operation to avoid socket timeout changes | ||
// affecting an existing operation | ||
if (workItem.monitoring) { | ||
var foundValidConnection = false; | ||
for (let i = 0; i < self.availableConnections.length; i++) { | ||
// If the connection is connected | ||
// And there are no pending workItems on it | ||
// Then we can safely use it for monitoring. | ||
if ( | ||
self.availableConnections[i].isConnected() && | ||
self.availableConnections[i].workItems.length === 0 | ||
) { | ||
foundValidConnection = true; | ||
connection = self.availableConnections[i]; | ||
break; | ||
} | ||
} | ||
// No connection found that has no work on it, just pick one for pipelining | ||
if (connections.length === 0) { | ||
connection = | ||
self.availableConnections[self.connectionIndex++ % self.availableConnections.length]; | ||
} else { | ||
connection = connections[self.connectionIndex++ % connections.length]; | ||
} | ||
// No safe connection found, attempt to grow the connections | ||
// if possible and break from the loop | ||
if (!foundValidConnection) { | ||
// Put workItem back on the queue | ||
self.queue.unshift(workItem); | ||
// Is the connection connected | ||
if (connection.isConnected()) { | ||
// Get the next work item | ||
var workItem = self.queue.shift(); | ||
// If we are monitoring we need to use a connection that is not | ||
// running another operation to avoid socket timeout changes | ||
// affecting an existing operation | ||
if (workItem.monitoring) { | ||
var foundValidConnection = false; | ||
for (i = 0; i < self.availableConnections.length; i++) { | ||
// If the connection is connected | ||
// And there are no pending workItems on it | ||
// Then we can safely use it for monitoring. | ||
if ( | ||
self.availableConnections[i].isConnected() && | ||
self.availableConnections[i].workItems.length === 0 | ||
) { | ||
foundValidConnection = true; | ||
connection = self.availableConnections[i]; | ||
break; | ||
} | ||
} | ||
// No safe connection found, attempt to grow the connections | ||
// if possible and break from the loop | ||
if (!foundValidConnection) { | ||
// Put workItem back on the queue | ||
self.queue.unshift(workItem); | ||
// Attempt to grow the pool if it's not yet maxsize | ||
if (totalConnections < self.options.size && self.queue.length > 0) { | ||
// Create a new connection | ||
_createConnection(self); | ||
} | ||
// Re-execute the operation | ||
setTimeout(function() { | ||
_execute(self)(); | ||
}, 10); | ||
break; | ||
} | ||
// Attempt to grow the pool if it's not yet maxsize | ||
if (totalConnections < self.options.size && self.queue.length > 0) { | ||
// Create a new connection | ||
_createConnection(self); | ||
} | ||
// Don't execute operation until we have a full pool | ||
if (totalConnections < self.options.size) { | ||
// Connection has work items, then put it back on the queue | ||
// and create a new connection | ||
if (connection.workItems.length > 0) { | ||
// Lets put the workItem back on the list | ||
self.queue.unshift(workItem); | ||
// Create a new connection | ||
_createConnection(self); | ||
// Break from the loop | ||
break; | ||
} | ||
} | ||
// Re-execute the operation | ||
setTimeout(function() { | ||
_execute(self)(); | ||
}, 10); | ||
// Get actual binary commands | ||
var buffer = workItem.buffer; | ||
break; | ||
} | ||
} | ||
// Set current status of authentication process | ||
workItem.authenticating = self.authenticating; | ||
workItem.authenticatingTimestamp = self.authenticatingTimestamp; | ||
// Don't execute operation until we have a full pool | ||
if (totalConnections < self.options.size) { | ||
// Connection has work items, then put it back on the queue | ||
// and create a new connection | ||
if (connection.workItems.length > 0) { | ||
// Lets put the workItem back on the list | ||
self.queue.unshift(workItem); | ||
// Create a new connection | ||
_createConnection(self); | ||
// Break from the loop | ||
break; | ||
} | ||
} | ||
// If we are monitoring take the connection of the availableConnections | ||
if (workItem.monitoring) { | ||
moveConnectionBetween(connection, self.availableConnections, self.inUseConnections); | ||
} | ||
// Get actual binary commands | ||
var buffer = workItem.buffer; | ||
// Track the executing commands on the mongo server | ||
// as long as there is an expected response | ||
if (!workItem.noResponse) { | ||
connection.workItems.push(workItem); | ||
} | ||
// If we are monitoring take the connection of the availableConnections | ||
if (workItem.monitoring) { | ||
moveConnectionBetween(connection, self.availableConnections, self.inUseConnections); | ||
} | ||
// We have a custom socketTimeout | ||
if (!workItem.immediateRelease && typeof workItem.socketTimeout === 'number') { | ||
connection.setSocketTimeout(workItem.socketTimeout); | ||
} | ||
// Track the executing commands on the mongo server | ||
// as long as there is an expected response | ||
if (!workItem.noResponse) { | ||
connection.workItems.push(workItem); | ||
} | ||
// Capture if write was successful | ||
var writeSuccessful = true; | ||
// We have a custom socketTimeout | ||
if (!workItem.immediateRelease && typeof workItem.socketTimeout === 'number') { | ||
connection.setSocketTimeout(workItem.socketTimeout); | ||
} | ||
// Put operation on the wire | ||
if (Array.isArray(buffer)) { | ||
for (i = 0; i < buffer.length; i++) { | ||
writeSuccessful = connection.write(buffer[i]); | ||
} | ||
} else { | ||
writeSuccessful = connection.write(buffer); | ||
} | ||
// Capture if write was successful | ||
var writeSuccessful = true; | ||
// if the command is designated noResponse, call the callback immeditely | ||
if (workItem.noResponse && typeof workItem.cb === 'function') { | ||
workItem.cb(null, null); | ||
} | ||
if (writeSuccessful && workItem.immediateRelease && self.authenticating) { | ||
removeConnection(self, connection); | ||
self.nonAuthenticatedConnections.push(connection); | ||
} else if (writeSuccessful === false) { | ||
// If write not successful put back on queue | ||
self.queue.unshift(workItem); | ||
// Remove the disconnected connection | ||
removeConnection(self, connection); | ||
// Flush any monitoring operations in the queue, failing fast | ||
flushMonitoringOperations(self.queue); | ||
} | ||
} else { | ||
// Remove the disconnected connection | ||
removeConnection(self, connection); | ||
// Flush any monitoring operations in the queue, failing fast | ||
flushMonitoringOperations(self.queue); | ||
// Put operation on the wire | ||
if (Array.isArray(buffer)) { | ||
for (let i = 0; i < buffer.length; i++) { | ||
writeSuccessful = connection.write(buffer[i]); | ||
} | ||
} else { | ||
writeSuccessful = connection.write(buffer); | ||
} | ||
}); | ||
// if the command is designated noResponse, call the callback immeditely | ||
if (workItem.noResponse && typeof workItem.cb === 'function') { | ||
workItem.cb(null, null); | ||
} | ||
if (writeSuccessful === false) { | ||
// If write not successful put back on queue | ||
self.queue.unshift(workItem); | ||
// Remove the disconnected connection | ||
removeConnection(self, connection); | ||
// Flush any monitoring operations in the queue, failing fast | ||
flushMonitoringOperations(self.queue); | ||
break; | ||
} | ||
} | ||
self.executing = false; | ||
@@ -1631,0 +1285,0 @@ }; |
'use strict'; | ||
var f = require('util').format, | ||
require_optional = require('require_optional'); | ||
const require_optional = require('require_optional'); | ||
// Set property function | ||
var setProperty = function(obj, prop, flag, values) { | ||
Object.defineProperty(obj, prop.name, { | ||
enumerable: true, | ||
set: function(value) { | ||
if (typeof value !== 'boolean') throw new Error(f('%s required a boolean', prop.name)); | ||
// Flip the bit to 1 | ||
if (value === true) values.flags |= flag; | ||
// Flip the bit to 0 if it's set, otherwise ignore | ||
if (value === false && (values.flags & flag) === flag) values.flags ^= flag; | ||
prop.value = value; | ||
}, | ||
get: function() { | ||
return prop.value; | ||
} | ||
}); | ||
}; | ||
// Set property function | ||
var getProperty = function(obj, propName, fieldName, values, func) { | ||
Object.defineProperty(obj, propName, { | ||
enumerable: true, | ||
get: function() { | ||
// Not parsed yet, parse it | ||
if (values[fieldName] == null && obj.isParsed && !obj.isParsed()) { | ||
obj.parse(); | ||
} | ||
// Do we have a post processing function | ||
if (typeof func === 'function') return func(values[fieldName]); | ||
// Return raw value | ||
return values[fieldName]; | ||
} | ||
}); | ||
}; | ||
// Set simple property | ||
var getSingleProperty = function(obj, name, value) { | ||
Object.defineProperty(obj, name, { | ||
enumerable: true, | ||
get: function() { | ||
return value; | ||
} | ||
}); | ||
}; | ||
// Shallow copy | ||
var copy = function(fObj, tObj) { | ||
tObj = tObj || {}; | ||
for (var name in fObj) tObj[name] = fObj[name]; | ||
return tObj; | ||
}; | ||
var debugOptions = function(debugFields, options) { | ||
function debugOptions(debugFields, options) { | ||
var finaloptions = {}; | ||
@@ -66,5 +12,5 @@ debugFields.forEach(function(n) { | ||
return finaloptions; | ||
}; | ||
} | ||
var retrieveBSON = function() { | ||
function retrieveBSON() { | ||
var BSON = require('bson'); | ||
@@ -82,13 +28,13 @@ BSON.native = false; | ||
return BSON; | ||
}; | ||
} | ||
// Throw an error if an attempt to use Snappy is made when Snappy is not installed | ||
var noSnappyWarning = function() { | ||
function noSnappyWarning() { | ||
throw new Error( | ||
'Attempted to use Snappy compression, but Snappy is not installed. Install or disable Snappy compression and try again.' | ||
); | ||
}; | ||
} | ||
// Facilitate loading Snappy optionally | ||
var retrieveSnappy = function() { | ||
function retrieveSnappy() { | ||
var snappy = null; | ||
@@ -107,10 +53,8 @@ try { | ||
return snappy; | ||
} | ||
module.exports = { | ||
debugOptions, | ||
retrieveBSON, | ||
retrieveSnappy | ||
}; | ||
exports.setProperty = setProperty; | ||
exports.getProperty = getProperty; | ||
exports.getSingleProperty = getSingleProperty; | ||
exports.copy = copy; | ||
exports.debugOptions = debugOptions; | ||
exports.retrieveBSON = retrieveBSON; | ||
exports.retrieveSnappy = retrieveSnappy; |
@@ -10,3 +10,3 @@ 'use strict'; | ||
const collationNotSupported = require('./utils').collationNotSupported; | ||
const wireProtocol = require('./wireprotocol'); | ||
const BSON = retrieveBSON(); | ||
@@ -227,10 +227,3 @@ const Long = BSON.Long; | ||
this.server.wireProtocolHandler.getMore( | ||
this.server, | ||
this.ns, | ||
this.cursorState, | ||
batchSize, | ||
this.options, | ||
callback | ||
); | ||
wireProtocol.getMore(this.server, this.ns, this.cursorState, batchSize, this.options, callback); | ||
}; | ||
@@ -344,3 +337,3 @@ | ||
this.server.wireProtocolHandler.killCursor(this.server, this.ns, this.cursorState, callback); | ||
wireProtocol.killCursors(this.server, this.ns, this.cursorState, callback); | ||
}; | ||
@@ -619,3 +612,12 @@ | ||
return cursor.topology.selectServer(cursor.options, (err, server) => { | ||
// Very explicitly choose what is passed to selectServer | ||
const serverSelectOptions = {}; | ||
if (cursor.cursorState.session) { | ||
serverSelectOptions.session = cursor.cursorState.session; | ||
} | ||
if (cursor.options.readPreference) { | ||
serverSelectOptions.readPreference = cursor.options.readPreference; | ||
} | ||
return cursor.topology.selectServer(serverSelectOptions, (err, server) => { | ||
if (err) { | ||
@@ -714,3 +716,4 @@ const disconnectHandler = cursor.disconnectHandler; | ||
// Otherwise fall back to regular find path | ||
cursor.cursorState.cursorId = result.cursorId; | ||
const cursorId = result.cursorId || 0; | ||
cursor.cursorState.cursorId = Long.fromNumber(cursorId); | ||
cursor.cursorState.documents = result.documents; | ||
@@ -740,3 +743,3 @@ cursor.cursorState.lastCursorId = result.cursorId; | ||
if (cursor.cmd.find != null) { | ||
cursor.server.wireProtocolHandler.query( | ||
wireProtocol.query( | ||
cursor.server, | ||
@@ -753,3 +756,3 @@ cursor.ns, | ||
cursor.query = cursor.server.wireProtocolHandler.command( | ||
cursor.query = wireProtocol.command( | ||
cursor.server, | ||
@@ -756,0 +759,0 @@ cursor.ns, |
@@ -45,2 +45,6 @@ 'use strict'; | ||
} | ||
hasErrorLabel(label) { | ||
return this.errorLabels && this.errorLabels.indexOf(label) !== -1; | ||
} | ||
} | ||
@@ -47,0 +51,0 @@ |
@@ -125,3 +125,11 @@ 'use strict'; | ||
*/ | ||
function monitorServer(server) { | ||
function monitorServer(server, options) { | ||
options = options || {}; | ||
const heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000; | ||
if (options.initial === true) { | ||
server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS); | ||
return; | ||
} | ||
// executes a single check of a server | ||
@@ -134,2 +142,5 @@ const checkServer = callback => { | ||
// NOTE: legacy monitoring event | ||
process.nextTick(() => server.emit('monitoring', server)); | ||
server.command( | ||
@@ -142,3 +153,3 @@ 'admin.$cmd', | ||
}, | ||
function(err, result) { | ||
(err, result) => { | ||
let duration = calculateDurationInMs(start); | ||
@@ -173,6 +184,3 @@ | ||
// schedule the next monitoring process | ||
server.s.monitorId = setTimeout( | ||
() => monitorServer(server), | ||
server.s.options.heartbeatFrequencyMS | ||
); | ||
server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS); | ||
}; | ||
@@ -191,17 +199,20 @@ | ||
// change its type to `Unknown` only after retrying once. | ||
server.s.pool.reset(() => { | ||
// otherwise re-attempt monitoring once | ||
checkServer((error, isMaster) => { | ||
if (error) { | ||
server.s.monitoring = false; | ||
// TODO: we need to reset the pool here | ||
// we revert to an `Unknown` by emitting a default description with no isMaster | ||
server.emit( | ||
'descriptionReceived', | ||
new ServerDescription(server.description.address, null, { error }) | ||
); | ||
return checkServer((err, isMaster) => { | ||
if (err) { | ||
server.s.monitoring = false; | ||
// we do not reschedule monitoring in this case | ||
return; | ||
} | ||
// revert to `Unknown` by emitting a default description with no isMaster | ||
server.emit('descriptionReceived', new ServerDescription(server.description.address)); | ||
// do not reschedule monitoring in this case | ||
return; | ||
} | ||
successHandler(isMaster); | ||
successHandler(isMaster); | ||
}); | ||
}); | ||
@@ -208,0 +219,0 @@ }); |
@@ -25,2 +25,6 @@ 'use strict'; | ||
'maxWireVersion', | ||
'maxBsonObjectSize', | ||
'maxMessageSizeBytes', | ||
'maxWriteBatchSize', | ||
'compression', | ||
'me', | ||
@@ -35,3 +39,6 @@ 'hosts', | ||
'primary', | ||
'logicalSessionTimeoutMinutes' | ||
'logicalSessionTimeoutMinutes', | ||
'saslSupportedMechs', | ||
'__nodejs_mock_server__', | ||
'$clusterTime' | ||
]; | ||
@@ -67,3 +74,3 @@ | ||
this.address = address; | ||
this.error = null; | ||
this.error = options.error || null; | ||
this.roundTripTime = options.roundTripTime || 0; | ||
@@ -81,2 +88,3 @@ this.lastUpdateTime = Date.now(); | ||
// normalize case for hosts | ||
if (this.me) this.me = this.me.toLowerCase(); | ||
this.hosts = this.hosts.map(host => host.toLowerCase()); | ||
@@ -83,0 +91,0 @@ this.passives = this.passives.map(host => host.toLowerCase()); |
@@ -11,2 +11,5 @@ 'use strict'; | ||
/** | ||
* Returns a server selector that selects for writable servers | ||
*/ | ||
function writableServerSelector() { | ||
@@ -18,3 +21,11 @@ return function(topologyDescription, servers) { | ||
// reducers | ||
/** | ||
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification | ||
* found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst | ||
* | ||
* @param {ReadPreference} readPreference The read preference providing max staleness guidance | ||
* @param {topologyDescription} topologyDescription The topology description | ||
* @param {ServerDescription[]} servers The list of server descriptions to be reduced | ||
* @return {ServerDescription[]} The list of servers that satisfy the requirements of max staleness | ||
*/ | ||
function maxStalenessReducer(readPreference, topologyDescription, servers) { | ||
@@ -29,3 +40,3 @@ if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) { | ||
if (maxStaleness < maxStalenessVariance) { | ||
throw MongoError(`maxStalenessSeconds must be at least ${maxStalenessVariance} seconds`); | ||
throw new MongoError(`maxStalenessSeconds must be at least ${maxStalenessVariance} seconds`); | ||
} | ||
@@ -67,2 +78,8 @@ | ||
/** | ||
* Determines whether a server's tags match a given set of tags | ||
* | ||
* @param {String[]} tagSet The requested tag set to match | ||
* @param {String[]} serverTags The server's tags | ||
*/ | ||
function tagSetMatch(tagSet, serverTags) { | ||
@@ -81,2 +98,9 @@ const keys = Object.keys(tagSet); | ||
/** | ||
* Reduces a set of server descriptions based on tags requested by the read preference | ||
* | ||
* @param {ReadPreference} readPreference The read preference providing the requested tags | ||
* @param {ServerDescription[]} servers The list of server descriptions to reduce | ||
* @return {ServerDescription[]} The list of servers matching the requested tags | ||
*/ | ||
function tagSetReducer(readPreference, servers) { | ||
@@ -105,2 +129,11 @@ if ( | ||
/** | ||
* Reduces a list of servers to ensure they fall within an acceptable latency window. This is | ||
* further specified in the "Server Selection" specification, found here: | ||
* https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst | ||
* | ||
* @param {topologyDescription} topologyDescription The topology description | ||
* @param {ServerDescription[]} servers The list of servers to reduce | ||
* @returns {ServerDescription[]} The servers which fall within an acceptable latency window | ||
*/ | ||
function latencyWindowReducer(topologyDescription, servers) { | ||
@@ -137,2 +170,7 @@ const low = servers.reduce( | ||
/** | ||
* Returns a function which selects servers based on a provided read preference | ||
* | ||
* @param {ReadPreference} readPreference The read preference to select with | ||
*/ | ||
function readPreferenceServerSelector(readPreference) { | ||
@@ -139,0 +177,0 @@ if (!readPreference.isValid()) { |
@@ -6,6 +6,3 @@ 'use strict'; | ||
const relayEvents = require('../utils').relayEvents; | ||
const calculateDurationInMs = require('../utils').calculateDurationInMs; | ||
const Query = require('../connection/commands').Query; | ||
const TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support'); | ||
const ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support'); | ||
const wireProtocol = require('../wireprotocol'); | ||
const BSON = require('../connection/utils').retrieveBSON(); | ||
@@ -17,3 +14,39 @@ const createClientInfo = require('../topologies/shared').createClientInfo; | ||
const monitorServer = require('./monitoring').monitorServer; | ||
const MongoParseError = require('../error').MongoParseError; | ||
const MongoNetworkError = require('../error').MongoNetworkError; | ||
const collationNotSupported = require('../utils').collationNotSupported; | ||
const debugOptions = require('../connection/utils').debugOptions; | ||
// Used for filtering out fields for logging | ||
const DEBUG_FIELDS = [ | ||
'reconnect', | ||
'reconnectTries', | ||
'reconnectInterval', | ||
'emitError', | ||
'cursorFactory', | ||
'host', | ||
'port', | ||
'size', | ||
'keepAlive', | ||
'keepAliveInitialDelay', | ||
'noDelay', | ||
'connectionTimeout', | ||
'checkServerIdentity', | ||
'socketTimeout', | ||
'ssl', | ||
'ca', | ||
'crl', | ||
'cert', | ||
'key', | ||
'rejectUnauthorized', | ||
'promoteLongs', | ||
'promoteValues', | ||
'promoteBuffers', | ||
'servername' | ||
]; | ||
const STATE_DISCONNECTED = 0; | ||
const STATE_CONNECTING = 1; | ||
const STATE_CONNECTED = 2; | ||
/** | ||
@@ -32,3 +65,3 @@ * | ||
*/ | ||
constructor(description, options) { | ||
constructor(description, options, topology) { | ||
super(); | ||
@@ -49,4 +82,10 @@ | ||
monitoring: false, | ||
// the implementation of the monitoring method | ||
monitorFunction: options.monitorFunction || monitorServer, | ||
// the connection pool | ||
pool: null | ||
pool: null, | ||
// the server state | ||
state: STATE_DISCONNECTED, | ||
credentials: options.credentials, | ||
topology | ||
}; | ||
@@ -65,4 +104,2 @@ } | ||
* Initiate server connect | ||
* | ||
* @param {Array} [options.auth] Array of auth options to apply on connect | ||
*/ | ||
@@ -78,11 +115,23 @@ connect(options) { | ||
// create a pool | ||
this.s.pool = new Pool(this, Object.assign(this.s.options, options, { bson: this.s.bson })); | ||
const addressParts = this.description.address.split(':'); | ||
const poolOptions = Object.assign( | ||
{ host: addressParts[0], port: parseInt(addressParts[1], 10) }, | ||
this.s.options, | ||
options, | ||
{ bson: this.s.bson } | ||
); | ||
// Set up listeners | ||
// NOTE: this should only be the case if we are connecting to a single server | ||
poolOptions.reconnect = true; | ||
this.s.pool = new Pool(this, poolOptions); | ||
// setup listeners | ||
this.s.pool.on('connect', connectEventHandler(this)); | ||
this.s.pool.on('close', closeEventHandler(this)); | ||
this.s.pool.on('close', errorEventHandler(this)); | ||
this.s.pool.on('error', errorEventHandler(this)); | ||
this.s.pool.on('parseError', parseErrorEventHandler(this)); | ||
// this.s.pool.on('error', errorEventHandler(this)); | ||
// it is unclear whether consumers should even know about these events | ||
// this.s.pool.on('timeout', timeoutEventHandler(this)); | ||
// this.s.pool.on('parseError', errorEventHandler(this)); | ||
// this.s.pool.on('reconnect', reconnectEventHandler(this)); | ||
@@ -94,2 +143,4 @@ // this.s.pool.on('reconnectFailed', errorEventHandler(this)); | ||
this.s.state = STATE_CONNECTING; | ||
// If auth settings have been provided, use them | ||
@@ -107,10 +158,29 @@ if (options.auth) { | ||
* | ||
* @param {Boolean} [options.emitClose=false] Emit close event on destroy | ||
* @param {Boolean} [options.emitDestroy=false] Emit destroy event on destroy | ||
* @param {Boolean} [options.force=false] Force destroy the pool | ||
*/ | ||
destroy(callback) { | ||
if (typeof callback === 'function') { | ||
callback(null, null); | ||
destroy(options, callback) { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
options = Object.assign({}, { force: false }, options); | ||
if (!this.s.pool) { | ||
this.s.state = STATE_DISCONNECTED; | ||
if (typeof callback === 'function') { | ||
callback(null, null); | ||
} | ||
return; | ||
} | ||
['close', 'error', 'timeout', 'parseError', 'connect'].forEach(event => { | ||
this.s.pool.removeAllListeners(event); | ||
}); | ||
if (this.s.monitorId) { | ||
clearTimeout(this.s.monitorId); | ||
} | ||
this.s.pool.destroy(options.force, err => { | ||
this.s.state = STATE_DISCONNECTED; | ||
callback(err); | ||
}); | ||
} | ||
@@ -122,6 +192,7 @@ | ||
*/ | ||
monitor() { | ||
if (this.s.monitoring) return; | ||
monitor(options) { | ||
options = options || {}; | ||
if (this.s.state !== STATE_CONNECTED || this.s.monitoring) return; | ||
if (this.s.monitorId) clearTimeout(this.s.monitorId); | ||
monitorServer(this); | ||
this.s.monitorFunction(this, options); | ||
} | ||
@@ -158,8 +229,12 @@ | ||
this.s.logger.debug( | ||
`executing command [${JSON.stringify({ ns, cmd, options })}] against ${this.name}` | ||
`executing command [${JSON.stringify({ | ||
ns, | ||
cmd, | ||
options: debugOptions(DEBUG_FIELDS, options) | ||
})}] against ${this.name}` | ||
); | ||
} | ||
// Check if we have collation support | ||
if (this.description.maxWireVersion < 5 && cmd.collation) { | ||
// error if collation not supported | ||
if (collationNotSupported(this, cmd)) { | ||
callback(new MongoError(`server ${this.name} does not support collation`)); | ||
@@ -169,23 +244,3 @@ return; | ||
// Create the query object | ||
const query = this.s.wireProtocolHandler.command(this, ns, cmd, {}, options); | ||
// Set slave OK of the query | ||
query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false; | ||
// write options | ||
const writeOptions = { | ||
raw: typeof options.raw === 'boolean' ? options.raw : false, | ||
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, | ||
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, | ||
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, | ||
command: true, | ||
monitoring: typeof options.monitoring === 'boolean' ? options.monitoring : false, | ||
fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false, | ||
requestId: query.requestId, | ||
socketTimeout: typeof options.socketTimeout === 'number' ? options.socketTimeout : null, | ||
session: options.session || null | ||
}; | ||
// write the operation to the pool | ||
this.s.pool.write(query, writeOptions, callback); | ||
wireProtocol.command(this, ns, cmd, options, callback); | ||
} | ||
@@ -242,2 +297,11 @@ | ||
Object.defineProperty(Server.prototype, 'clusterTime', { | ||
get: function() { | ||
return this.s.topology.clusterTime; | ||
}, | ||
set: function(clusterTime) { | ||
this.s.topology.clusterTime = clusterTime; | ||
} | ||
}); | ||
function basicWriteValidations(server) { | ||
@@ -282,4 +346,3 @@ if (!server.s.pool) { | ||
// Check if we have collation support | ||
if (server.description.maxWireVersion < 5 && options.collation) { | ||
if (collationNotSupported(server, options)) { | ||
callback(new MongoError(`server ${this.name} does not support collation`)); | ||
@@ -289,134 +352,52 @@ return; | ||
// Execute write | ||
return server.s.wireProtocolHandler[op](server.s.pool, ns, server.s.bson, ops, options, callback); | ||
return wireProtocol[op](server, ns, ops, options, callback); | ||
} | ||
function saslSupportedMechs(options) { | ||
if (!options) { | ||
return {}; | ||
} | ||
function connectEventHandler(server) { | ||
return function(pool, conn) { | ||
const ismaster = conn.ismaster; | ||
server.s.lastIsMasterMS = conn.lastIsMasterMS; | ||
if (conn.agreedCompressor) { | ||
server.s.pool.options.agreedCompressor = conn.agreedCompressor; | ||
} | ||
const authArray = options.auth || []; | ||
const authMechanism = authArray[0] || options.authMechanism; | ||
const authSource = authArray[1] || options.authSource || options.dbName || 'admin'; | ||
const user = authArray[2] || options.user; | ||
if (conn.zlibCompressionLevel) { | ||
server.s.pool.options.zlibCompressionLevel = conn.zlibCompressionLevel; | ||
} | ||
if (typeof authMechanism === 'string' && authMechanism.toUpperCase() !== 'DEFAULT') { | ||
return {}; | ||
} | ||
if (conn.ismaster.$clusterTime) { | ||
const $clusterTime = conn.ismaster.$clusterTime; | ||
server.s.sclusterTime = $clusterTime; | ||
} | ||
if (!user) { | ||
return {}; | ||
} | ||
// log the connection event if requested | ||
if (server.s.logger.isInfo()) { | ||
server.s.logger.info( | ||
`server ${server.name} connected with ismaster [${JSON.stringify(ismaster)}]` | ||
); | ||
} | ||
return { saslSupportedMechs: `${authSource}.${user}` }; | ||
} | ||
// emit an event indicating that our description has changed | ||
server.emit('descriptionReceived', new ServerDescription(server.description.address, ismaster)); | ||
function extractIsMasterError(err, result) { | ||
if (err) return err; | ||
if (result && result.result && result.result.ok === 0) { | ||
return new MongoError(result.result); | ||
} | ||
// we are connected and handshaked (guaranteed by the pool) | ||
server.s.state = STATE_CONNECTED; | ||
server.emit('connect', server); | ||
}; | ||
} | ||
function executeServerHandshake(server, callback) { | ||
// construct an `ismaster` query | ||
const compressors = | ||
server.s.options.compression && server.s.options.compression.compressors | ||
? server.s.options.compression.compressors | ||
: []; | ||
function errorEventHandler(server) { | ||
return function(err) { | ||
if (err) { | ||
server.emit('error', new MongoNetworkError(err)); | ||
} | ||
const queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true }; | ||
const query = new Query( | ||
server.s.bson, | ||
'admin.$cmd', | ||
Object.assign( | ||
{ ismaster: true, client: server.s.clientInfo, compression: compressors }, | ||
saslSupportedMechs(server.s.options) | ||
), | ||
queryOptions | ||
); | ||
// execute the query | ||
server.s.pool.write( | ||
query, | ||
{ socketTimeout: server.s.options.connectionTimeout || 2000 }, | ||
callback | ||
); | ||
} | ||
function configureWireProtocolHandler(ismaster) { | ||
// 3.2 wire protocol handler | ||
if (ismaster.maxWireVersion >= 4) { | ||
return new ThreeTwoWireProtocolSupport(); | ||
} | ||
// default to 2.6 wire protocol handler | ||
return new TwoSixWireProtocolSupport(); | ||
} | ||
function connectEventHandler(server) { | ||
return function() { | ||
// log information of received information if in info mode | ||
// if (server.s.logger.isInfo()) { | ||
// var object = err instanceof MongoError ? JSON.stringify(err) : {}; | ||
// server.s.logger.info(`server ${server.name} fired event ${event} out with message ${object}`); | ||
// } | ||
// begin initial server handshake | ||
const start = process.hrtime(); | ||
executeServerHandshake(server, (err, response) => { | ||
// Set initial lastIsMasterMS - is this needed? | ||
server.s.lastIsMasterMS = calculateDurationInMs(start); | ||
const serverError = extractIsMasterError(err, response); | ||
if (serverError) { | ||
server.emit('error', serverError); | ||
return; | ||
} | ||
// extract the ismaster from the server response | ||
const isMaster = response.result; | ||
// compression negotation | ||
if (isMaster && isMaster.compression) { | ||
const localCompressionInfo = server.s.options.compression; | ||
const localCompressors = localCompressionInfo.compressors; | ||
for (var i = 0; i < localCompressors.length; i++) { | ||
if (isMaster.compression.indexOf(localCompressors[i]) > -1) { | ||
server.s.pool.options.agreedCompressor = localCompressors[i]; | ||
break; | ||
} | ||
} | ||
if (localCompressionInfo.zlibCompressionLevel) { | ||
server.s.pool.options.zlibCompressionLevel = localCompressionInfo.zlibCompressionLevel; | ||
} | ||
} | ||
// configure the wire protocol handler | ||
server.s.wireProtocolHandler = configureWireProtocolHandler(isMaster); | ||
// log the connection event if requested | ||
if (server.s.logger.isInfo()) { | ||
server.s.logger.info( | ||
`server ${server.name} connected with ismaster [${JSON.stringify(isMaster)}]` | ||
); | ||
} | ||
// emit an event indicating that our description has changed | ||
server.emit( | ||
'descriptionReceived', | ||
new ServerDescription(server.description.address, isMaster) | ||
); | ||
// emit a connect event | ||
server.emit('connect', isMaster); | ||
}); | ||
server.emit('close'); | ||
}; | ||
} | ||
function closeEventHandler(server) { | ||
return function() { | ||
server.emit('close'); | ||
function parseErrorEventHandler(server) { | ||
return function(err) { | ||
server.s.state = STATE_DISCONNECTED; | ||
server.emit('error', new MongoParseError(err)); | ||
}; | ||
@@ -423,0 +404,0 @@ } |
@@ -31,3 +31,12 @@ 'use strict'; | ||
*/ | ||
constructor(topologyType, serverDescriptions, setName, maxSetVersion, maxElectionId, options) { | ||
constructor( | ||
topologyType, | ||
serverDescriptions, | ||
setName, | ||
maxSetVersion, | ||
maxElectionId, | ||
commonWireVersion, | ||
options, | ||
error | ||
) { | ||
options = options || {}; | ||
@@ -50,5 +59,9 @@ | ||
this.options = options; | ||
this.error = error; | ||
this.commonWireVersion = commonWireVersion || null; | ||
// determine server compatibility | ||
for (const serverDescription of this.servers.values()) { | ||
if (serverDescription.type === ServerType.Unknown) continue; | ||
if (serverDescription.minWireVersion > MAX_SUPPORTED_WIRE_VERSION) { | ||
@@ -84,15 +97,2 @@ this.compatible = false; | ||
/** | ||
* @returns The minimum reported wire version of all known servers | ||
*/ | ||
get commonWireVersion() { | ||
return Array.from(this.servers.values()) | ||
.filter(server => server.type !== ServerType.Unknown) | ||
.reduce( | ||
(min, server) => | ||
min == null ? server.maxWireVersion : Math.min(min, server.maxWireVersion), | ||
null | ||
); | ||
} | ||
/** | ||
* Returns a copy of this description updated with a given ServerDescription | ||
@@ -112,2 +112,4 @@ * | ||
let maxElectionId = this.maxElectionId; | ||
let commonWireVersion = this.commonWireVersion; | ||
let error = serverDescription.error || null; | ||
@@ -117,2 +119,11 @@ const serverType = serverDescription.type; | ||
// update common wire version | ||
if (serverDescription.maxWireVersion !== 0) { | ||
if (commonWireVersion == null) { | ||
commonWireVersion = serverDescription.maxWireVersion; | ||
} else { | ||
commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion); | ||
} | ||
} | ||
// update the actual server description | ||
@@ -129,3 +140,5 @@ serverDescriptions.set(address, serverDescription); | ||
maxElectionId, | ||
this.options | ||
commonWireVersion, | ||
this.options, | ||
error | ||
); | ||
@@ -210,3 +223,5 @@ } | ||
maxElectionId, | ||
this.options | ||
commonWireVersion, | ||
this.options, | ||
error | ||
); | ||
@@ -213,0 +228,0 @@ } |
'use strict'; | ||
const EventEmitter = require('events'); | ||
const ServerDescription = require('./server_description').ServerDescription; | ||
const ServerType = require('./server_description').ServerType; | ||
const TopologyDescription = require('./topology_description').TopologyDescription; | ||
@@ -9,3 +10,2 @@ const TopologyType = require('./topology_description').TopologyType; | ||
const MongoTimeoutError = require('../error').MongoTimeoutError; | ||
const MongoNetworkError = require('../error').MongoNetworkError; | ||
const Server = require('./server'); | ||
@@ -17,6 +17,12 @@ const relayEvents = require('../utils').relayEvents; | ||
const isRetryableWritesSupported = require('../topologies/shared').isRetryableWritesSupported; | ||
const Cursor = require('./cursor'); | ||
const Cursor = require('../cursor'); | ||
const deprecate = require('util').deprecate; | ||
const BSON = require('../connection/utils').retrieveBSON(); | ||
const createCompressionInfo = require('../topologies/shared').createCompressionInfo; | ||
const isRetryableError = require('../error').isRetryableError; | ||
const MongoParseError = require('../error').MongoParseError; | ||
const ClientSession = require('../sessions').ClientSession; | ||
const createClientInfo = require('../topologies/shared').createClientInfo; | ||
const MongoError = require('../error').MongoError; | ||
const resolveClusterTime = require('../topologies/shared').resolveClusterTime; | ||
@@ -31,5 +37,27 @@ // Global state | ||
heartbeatFrequencyMS: 30000, | ||
minHeartbeatIntervalMS: 500 | ||
minHeartbeatFrequencyMS: 500 | ||
}; | ||
// events that we relay to the `Topology` | ||
const SERVER_RELAY_EVENTS = [ | ||
'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', | ||
'serverHeartbeatFailed', | ||
'commandStarted', | ||
'commandSucceeded', | ||
'commandFailed', | ||
// NOTE: Legacy events | ||
'monitoring' | ||
]; | ||
// all events we listen to from `Server` instances | ||
const LOCAL_SERVER_EVENTS = SERVER_RELAY_EVENTS.concat([ | ||
'error', | ||
'connect', | ||
'descriptionReceived', | ||
'close', | ||
'ended' | ||
]); | ||
/** | ||
@@ -60,3 +88,3 @@ * A container of server instances representing a connection to a MongoDB topology. | ||
super(); | ||
if (typeof options === 'undefined') { | ||
if (typeof options === 'undefined' && typeof seedlist !== 'string') { | ||
options = seedlist; | ||
@@ -72,2 +100,6 @@ seedlist = []; | ||
seedlist = seedlist || []; | ||
if (typeof seedlist === 'string') { | ||
seedlist = parseStringSeedlist(seedlist); | ||
} | ||
options = Object.assign({}, TOPOLOGY_DEFAULTS, options); | ||
@@ -78,2 +110,3 @@ | ||
const serverDescriptions = seedlist.reduce((result, seed) => { | ||
if (seed.domain_socket) seed.host = seed.domain_socket; | ||
const address = seed.port ? `${seed.host}:${seed.port}` : `${seed.host}:27017`; | ||
@@ -88,3 +121,3 @@ result.set(address, new ServerDescription(address)); | ||
// passed in options | ||
options: Object.assign({}, options), | ||
options, | ||
// initial seedlist of servers to connect to | ||
@@ -99,2 +132,3 @@ seedlist: seedlist, | ||
null, | ||
null, | ||
options | ||
@@ -108,20 +142,13 @@ ), | ||
// the bson parser | ||
bson: | ||
options.bson || | ||
new BSON([ | ||
BSON.Binary, | ||
BSON.Code, | ||
BSON.DBRef, | ||
BSON.Decimal128, | ||
BSON.Double, | ||
BSON.Int32, | ||
BSON.Long, | ||
BSON.Map, | ||
BSON.MaxKey, | ||
BSON.MinKey, | ||
BSON.ObjectId, | ||
BSON.BSONRegExp, | ||
BSON.Symbol, | ||
BSON.Timestamp | ||
]) | ||
bson: options.bson || new BSON(), | ||
// a map of server instances to normalized addresses | ||
servers: new Map(), | ||
// Server Session Pool | ||
sessionPool: null, | ||
// Active client sessions | ||
sessions: [], | ||
// Promise library | ||
promiseLibrary: options.promiseLibrary || Promise, | ||
credentials: options.credentials, | ||
clusterTime: null | ||
}; | ||
@@ -131,2 +158,5 @@ | ||
this.s.options.compression = { compressors: createCompressionInfo(options) }; | ||
// add client info | ||
this.s.clientInfo = createClientInfo(options); | ||
} | ||
@@ -141,2 +171,6 @@ | ||
get parserType() { | ||
return BSON.native ? 'c++' : 'js'; | ||
} | ||
/** | ||
@@ -158,4 +192,8 @@ * All raw connections | ||
* @param {Array} [options.auth=null] Array of auth options to apply on connect | ||
* @param {function} [callback] An optional callback called once on the first connected server | ||
*/ | ||
connect(/* options */) { | ||
connect(options, callback) { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
options = options || {}; | ||
// emit SDAM monitoring events | ||
@@ -176,2 +214,42 @@ this.emit('topologyOpening', new monitoring.TopologyOpeningEvent(this.s.id)); | ||
this.s.connected = true; | ||
// otherwise, wait for a server to properly connect based on user provided read preference, | ||
// or primary. | ||
translateReadPreference(options); | ||
const readPreference = options.readPreference || ReadPreference.primary; | ||
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { | ||
if (err) { | ||
if (typeof callback === 'function') { | ||
callback(err, null); | ||
} else { | ||
this.emit('error', err); | ||
} | ||
return; | ||
} | ||
const errorHandler = err => { | ||
server.removeListener('connect', connectHandler); | ||
if (typeof callback === 'function') callback(err, null); | ||
}; | ||
const connectHandler = (_, err) => { | ||
server.removeListener('error', errorHandler); | ||
this.emit('open', err, this); | ||
this.emit('connect', this); | ||
if (typeof callback === 'function') callback(err, this); | ||
}; | ||
const STATE_CONNECTING = 1; | ||
if (server.s.state === STATE_CONNECTING) { | ||
server.once('error', errorHandler); | ||
server.once('connect', connectHandler); | ||
return; | ||
} | ||
connectHandler(); | ||
}); | ||
} | ||
@@ -182,14 +260,37 @@ | ||
*/ | ||
close(callback) { | ||
// destroy all child servers | ||
this.s.servers.forEach(server => server.destroy()); | ||
close(options, callback) { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
options = options || {}; | ||
// emit an event for close | ||
this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id)); | ||
if (this.s.sessionPool) { | ||
this.s.sessions.forEach(session => session.endSession()); | ||
this.s.sessionPool.endAllPooledSessions(); | ||
} | ||
this.s.connected = false; | ||
const servers = this.s.servers; | ||
if (servers.size === 0) { | ||
this.s.connected = false; | ||
if (typeof callback === 'function') { | ||
callback(null, null); | ||
} | ||
if (typeof callback === 'function') { | ||
callback(null, null); | ||
return; | ||
} | ||
// destroy all child servers | ||
let destroyed = 0; | ||
servers.forEach(server => | ||
destroyServer(server, this, () => { | ||
destroyed++; | ||
if (destroyed === servers.size) { | ||
// emit an event for close | ||
this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id)); | ||
this.s.connected = false; | ||
if (typeof callback === 'function') { | ||
callback(null, null); | ||
} | ||
} | ||
}) | ||
); | ||
} | ||
@@ -201,6 +302,21 @@ | ||
* @param {function} [selector] An optional selector to select servers by, defaults to a random selection within a latency window | ||
* @param {object} [options] Optional settings related to server selection | ||
* @param {number} [options.serverSelectionTimeoutMS] How long to block for server selection before throwing an error | ||
* @param {function} callback The callback used to indicate success or failure | ||
* @return {Server} An instance of a `Server` meeting the criteria of the predicate provided | ||
*/ | ||
selectServer(selector, options, callback) { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
if (typeof options === 'function') { | ||
callback = options; | ||
if (typeof selector !== 'function') { | ||
options = selector; | ||
translateReadPreference(options); | ||
const readPreference = options.readPreference || ReadPreference.primary; | ||
selector = readPreferenceServerSelector(readPreference); | ||
} else { | ||
options = {}; | ||
} | ||
} | ||
options = Object.assign( | ||
@@ -212,2 +328,11 @@ {}, | ||
const isSharded = this.description.type === TopologyType.Sharded; | ||
const session = options.session; | ||
const transaction = session && session.transaction; | ||
if (isSharded && transaction && transaction.server) { | ||
callback(null, transaction.server); | ||
return; | ||
} | ||
selectServers( | ||
@@ -220,3 +345,9 @@ this, | ||
if (err) return callback(err, null); | ||
callback(null, randomSelection(servers)); | ||
const selectedServer = randomSelection(servers); | ||
if (isSharded && transaction && transaction.isActive) { | ||
transaction.pinServer(selectedServer); | ||
} | ||
callback(null, selectedServer); | ||
} | ||
@@ -226,3 +357,46 @@ ); | ||
// Sessions related methods | ||
/** | ||
* @return Whether sessions are supported on the current topology | ||
*/ | ||
hasSessionSupport() { | ||
return this.description.logicalSessionTimeoutMinutes != null; | ||
} | ||
/** | ||
* Start a logical session | ||
*/ | ||
startSession(options, clientOptions) { | ||
const session = new ClientSession(this, this.s.sessionPool, options, clientOptions); | ||
session.once('ended', () => { | ||
this.s.sessions = this.s.sessions.filter(s => !s.equals(session)); | ||
}); | ||
this.s.sessions.push(session); | ||
return session; | ||
} | ||
/** | ||
* Send endSessions command(s) with the given session ids | ||
* | ||
* @param {Array} sessions The sessions to end | ||
* @param {function} [callback] | ||
*/ | ||
endSessions(sessions, callback) { | ||
if (!Array.isArray(sessions)) { | ||
sessions = [sessions]; | ||
} | ||
this.command( | ||
'admin.$cmd', | ||
{ endSessions: sessions }, | ||
{ readPreference: ReadPreference.primaryPreferred, noResponse: true }, | ||
() => { | ||
// intentionally ignored, per spec | ||
if (typeof callback === 'function') callback(); | ||
} | ||
); | ||
} | ||
/** | ||
* Update the internal TopologyDescription with a ServerDescription | ||
@@ -243,2 +417,6 @@ * | ||
this.s.description = this.s.description.update(serverDescription); | ||
if (this.s.description.compatibilityError) { | ||
this.emit('error', new MongoError(this.s.description.compatibilityError)); | ||
return; | ||
} | ||
@@ -259,2 +437,13 @@ // emit monitoring events for this change | ||
// Driver Sessions Spec: "Whenever a driver receives a cluster time from | ||
// a server it MUST compare it to the current highest seen cluster time | ||
// for the deployment. If the new cluster time is higher than the | ||
// highest seen cluster time it MUST become the new highest seen cluster | ||
// time. Two cluster times are compared using only the BsonTimestamp | ||
// value of the clusterTime embedded field." | ||
const clusterTime = serverDescription.$clusterTime; | ||
if (clusterTime) { | ||
resolveClusterTime(this, clusterTime); | ||
} | ||
this.emit( | ||
@@ -270,22 +459,9 @@ 'topologyDescriptionChanged', | ||
/** | ||
* Authenticate using a specified mechanism | ||
* | ||
* @param {String} mechanism The auth mechanism used for authentication | ||
* @param {String} db The db we are authenticating against | ||
* @param {Object} options Optional settings for the authenticating mechanism | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
auth(mechanism, db, options, callback) { | ||
callback(null, null); | ||
auth(credentials, callback) { | ||
if (typeof credentials === 'function') (callback = credentials), (credentials = null); | ||
if (typeof callback === 'function') callback(null, true); | ||
} | ||
/** | ||
* Logout from a database | ||
* | ||
* @param {String} db The db we are logging out from | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
logout(db, callback) { | ||
callback(null, null); | ||
logout(callback) { | ||
if (typeof callback === 'function') callback(null, true); | ||
} | ||
@@ -365,4 +541,6 @@ | ||
const readPreference = options.readPreference ? options.readPreference : ReadPreference.primary; | ||
this.selectServer(readPreferenceServerSelector(readPreference), (err, server) => { | ||
translateReadPreference(options); | ||
const readPreference = options.readPreference || ReadPreference.primary; | ||
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { | ||
if (err) { | ||
@@ -373,3 +551,31 @@ callback(err, null); | ||
server.command(ns, cmd, options, callback); | ||
const willRetryWrite = | ||
!options.retrying && | ||
!!options.retryWrites && | ||
options.session && | ||
isRetryableWritesSupported(this) && | ||
!options.session.inTransaction() && | ||
isWriteCommand(cmd); | ||
const cb = (err, result) => { | ||
if (!err) return callback(null, result); | ||
if (!isRetryableError(err)) { | ||
return callback(err); | ||
} | ||
if (willRetryWrite) { | ||
const newOptions = Object.assign({}, options, { retrying: true }); | ||
return this.command(ns, cmd, newOptions, callback); | ||
} | ||
return callback(err); | ||
}; | ||
// increment and assign txnNumber | ||
if (willRetryWrite) { | ||
options.session.incrementTransactionNumber(); | ||
options.willRetryWrite = willRetryWrite; | ||
} | ||
server.command(ns, cmd, options, cb); | ||
}); | ||
@@ -398,7 +604,57 @@ } | ||
const CursorClass = options.cursorFactory || this.s.Cursor; | ||
translateReadPreference(options); | ||
return new CursorClass(this.s.bson, ns, cmd, options, topology, this.s.options); | ||
} | ||
get clientInfo() { | ||
return this.s.clientInfo; | ||
} | ||
// Legacy methods for compat with old topology types | ||
isConnected() { | ||
// console.log('not implemented: `isConnected`'); | ||
return true; | ||
} | ||
isDestroyed() { | ||
// console.log('not implemented: `isDestroyed`'); | ||
return false; | ||
} | ||
unref() { | ||
console.log('not implemented: `unref`'); | ||
} | ||
// NOTE: There are many places in code where we explicitly check the last isMaster | ||
// to do feature support detection. This should be done any other way, but for | ||
// now we will just return the first isMaster seen, which should suffice. | ||
lastIsMaster() { | ||
const serverDescriptions = Array.from(this.description.servers.values()); | ||
if (serverDescriptions.length === 0) return {}; | ||
const sd = serverDescriptions.filter(sd => sd.type !== ServerType.Unknown)[0]; | ||
const result = sd || { maxWireVersion: this.description.commonWireVersion }; | ||
return result; | ||
} | ||
get logicalSessionTimeoutMinutes() { | ||
return this.description.logicalSessionTimeoutMinutes; | ||
} | ||
get bson() { | ||
return this.s.bson; | ||
} | ||
} | ||
Object.defineProperty(Topology.prototype, 'clusterTime', { | ||
enumerable: true, | ||
get: function() { | ||
return this.s.clusterTime; | ||
}, | ||
set: function(clusterTime) { | ||
this.s.clusterTime = clusterTime; | ||
} | ||
}); | ||
// legacy aliases | ||
@@ -410,5 +666,41 @@ Topology.prototype.destroy = deprecate( | ||
const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete']; | ||
function isWriteCommand(command) { | ||
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]); | ||
} | ||
/** | ||
* Destroys a server, and removes all event listeners from the instance | ||
* | ||
* @param {Server} server | ||
*/ | ||
function destroyServer(server, topology, callback) { | ||
LOCAL_SERVER_EVENTS.forEach(event => server.removeAllListeners(event)); | ||
server.destroy(() => { | ||
topology.emit( | ||
'serverClosed', | ||
new monitoring.ServerClosedEvent(topology.s.id, server.description.address) | ||
); | ||
if (typeof callback === 'function') callback(null, null); | ||
}); | ||
} | ||
/** | ||
* Parses a basic seedlist in string form | ||
* | ||
* @param {string} seedlist The seedlist to parse | ||
*/ | ||
function parseStringSeedlist(seedlist) { | ||
return seedlist.split(',').map(seed => ({ | ||
host: seed.split(':')[0], | ||
port: seed.split(':')[1] || 27017 | ||
})); | ||
} | ||
function topologyTypeFromSeedlist(seedlist, options) { | ||
if (seedlist.length === 1 && !options.replicaSet) return TopologyType.Single; | ||
if (options.replicaSet) return TopologyType.ReplicaSetNoPrimary; | ||
const replicaSet = options.replicaSet || options.setName || options.rs_name; | ||
if (seedlist.length === 1 && !replicaSet) return TopologyType.Single; | ||
if (replicaSet) return TopologyType.ReplicaSetNoPrimary; | ||
return TopologyType.Unknown; | ||
@@ -432,5 +724,39 @@ } | ||
function selectServers(topology, selector, timeout, start, callback) { | ||
const duration = calculateDurationInMs(start); | ||
if (duration >= timeout) { | ||
return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`)); | ||
} | ||
// ensure we are connected | ||
if (!topology.s.connected) { | ||
topology.connect(); | ||
// we want to make sure we're still within the requested timeout window | ||
const failToConnectTimer = setTimeout(() => { | ||
topology.removeListener('connect', connectHandler); | ||
callback(new MongoTimeoutError('Server selection timed out waiting to connect')); | ||
}, timeout - duration); | ||
const connectHandler = () => { | ||
clearTimeout(failToConnectTimer); | ||
selectServers(topology, selector, timeout, process.hrtime(), callback); | ||
}; | ||
topology.once('connect', connectHandler); | ||
return; | ||
} | ||
// otherwise, attempt server selection | ||
const serverDescriptions = Array.from(topology.description.servers.values()); | ||
let descriptions; | ||
// support server selection by options with readPreference | ||
if (typeof selector === 'object') { | ||
const readPreference = selector.readPreference | ||
? selector.readPreference | ||
: ReadPreference.primary; | ||
selector = readPreferenceServerSelector(readPreference); | ||
} | ||
try { | ||
@@ -449,44 +775,52 @@ descriptions = selector | ||
const duration = calculateDurationInMs(start); | ||
if (duration >= timeout) { | ||
return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`)); | ||
} | ||
const retrySelection = () => { | ||
// ensure all server monitors attempt monitoring immediately | ||
topology.s.servers.forEach(server => server.monitor()); | ||
// ensure all server monitors attempt monitoring soon | ||
topology.s.servers.forEach(server => { | ||
setTimeout( | ||
() => server.monitor({ heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS }), | ||
TOPOLOGY_DEFAULTS.minHeartbeatFrequencyMS | ||
); | ||
}); | ||
const iterationTimer = setTimeout(() => { | ||
callback(new MongoTimeoutError('Server selection timed out due to monitoring')); | ||
}, topology.s.minHeartbeatIntervalMS); | ||
topology.once('topologyDescriptionChanged', () => { | ||
const descriptionChangedHandler = () => { | ||
// successful iteration, clear the check timer | ||
clearTimeout(iterationTimer); | ||
if (topology.description.error) { | ||
callback(topology.description.error, null); | ||
return; | ||
} | ||
// topology description has changed due to monitoring, reattempt server selection | ||
selectServers(topology, selector, timeout, start, callback); | ||
}); | ||
}; | ||
}; | ||
// ensure we are connected | ||
if (!topology.s.connected) { | ||
topology.connect(); | ||
// we want to make sure we're still within the requested timeout window | ||
const failToConnectTimer = setTimeout(() => { | ||
callback(new MongoTimeoutError('Server selection timed out waiting to connect')); | ||
const iterationTimer = setTimeout(() => { | ||
topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler); | ||
callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`)); | ||
}, timeout - duration); | ||
topology.once('connect', () => { | ||
clearTimeout(failToConnectTimer); | ||
retrySelection(); | ||
}); | ||
topology.once('topologyDescriptionChanged', descriptionChangedHandler); | ||
}; | ||
return; | ||
} | ||
retrySelection(); | ||
} | ||
function createAndConnectServer(topology, serverDescription) { | ||
topology.emit( | ||
'serverOpening', | ||
new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address) | ||
); | ||
const server = new Server(serverDescription, topology.s.options, topology); | ||
relayEvents(server, topology, SERVER_RELAY_EVENTS); | ||
server.once('connect', serverConnectEventHandler(server, topology)); | ||
server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); | ||
server.on('error', serverErrorEventHandler(server, topology)); | ||
server.on('close', () => topology.emit('close', server)); | ||
server.connect(); | ||
return server; | ||
} | ||
/** | ||
@@ -501,19 +835,4 @@ * Create `Server` instances for all initially known servers, connect them, and assign | ||
topology.s.servers = serverDescriptions.reduce((servers, serverDescription) => { | ||
// publish an open event for each ServerDescription created | ||
topology.emit( | ||
'serverOpening', | ||
new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address) | ||
); | ||
const server = new Server(serverDescription, topology.s.options); | ||
relayEvents(server, topology, [ | ||
'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', | ||
'serverHeartbeatFailed' | ||
]); | ||
server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); | ||
server.on('connect', serverConnectEventHandler(server, topology)); | ||
const server = createAndConnectServer(topology, serverDescription); | ||
servers.set(serverDescription.address, server); | ||
server.connect(); | ||
return servers; | ||
@@ -523,7 +842,7 @@ }, new Map()); | ||
function updateServers(topology, currentServerDescription) { | ||
function updateServers(topology, incomingServerDescription) { | ||
// update the internal server's description | ||
if (topology.s.servers.has(currentServerDescription.address)) { | ||
const server = topology.s.servers.get(currentServerDescription.address); | ||
server.s.description = currentServerDescription; | ||
if (topology.s.servers.has(incomingServerDescription.address)) { | ||
const server = topology.s.servers.get(incomingServerDescription.address); | ||
server.s.description = incomingServerDescription; | ||
} | ||
@@ -534,18 +853,4 @@ | ||
if (!topology.s.servers.has(serverDescription.address)) { | ||
topology.emit( | ||
'serverOpening', | ||
new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address) | ||
); | ||
const server = new Server(serverDescription, topology.s.options); | ||
relayEvents(server, topology, [ | ||
'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', | ||
'serverHeartbeatFailed' | ||
]); | ||
server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); | ||
server.on('connect', serverConnectEventHandler(server, topology)); | ||
const server = createAndConnectServer(topology, serverDescription); | ||
topology.s.servers.set(serverDescription.address, server); | ||
server.connect(); | ||
} | ||
@@ -564,5 +869,4 @@ } | ||
server.destroy(() => | ||
topology.emit('serverClosed', new monitoring.ServerClosedEvent(topology.s.id, serverAddress)) | ||
); | ||
// prepare server for garbage collection | ||
destroyServer(server, topology); | ||
} | ||
@@ -572,7 +876,26 @@ } | ||
function serverConnectEventHandler(server, topology) { | ||
return function(/* ismaster */) { | ||
topology.emit('connect', topology); | ||
return function(/* isMaster, err */) { | ||
server.monitor({ | ||
initial: true, | ||
heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS | ||
}); | ||
}; | ||
} | ||
function serverErrorEventHandler(server, topology) { | ||
return function(err) { | ||
topology.emit( | ||
'serverClosed', | ||
new monitoring.ServerClosedEvent(topology.s.id, server.description.address) | ||
); | ||
if (err instanceof MongoParseError) { | ||
resetServerState(server, err, { clearPool: true }); | ||
return; | ||
} | ||
resetServerState(server, err); | ||
}; | ||
} | ||
function executeWriteOperation(args, options, callback) { | ||
@@ -590,3 +913,3 @@ if (typeof options === 'function') (callback = options), (options = {}); | ||
!args.retrying && | ||
options.retryWrites && | ||
!!options.retryWrites && | ||
options.session && | ||
@@ -596,3 +919,3 @@ isRetryableWritesSupported(topology) && | ||
topology.selectServer(writableServerSelector(), (err, server) => { | ||
topology.selectServer(writableServerSelector(), options, (err, server) => { | ||
if (err) { | ||
@@ -605,3 +928,3 @@ callback(err, null); | ||
if (!err) return callback(null, result); | ||
if (!(err instanceof MongoNetworkError) && !err.message.match(/not master/)) { | ||
if (!isRetryableError(err)) { | ||
return callback(err); | ||
@@ -630,8 +953,52 @@ } | ||
server[op](ns, ops, options, handler); | ||
}); | ||
} | ||
// we need to increment the statement id if we're in a transaction | ||
if (options.session && options.session.inTransaction()) { | ||
options.session.incrementStatementId(ops.length); | ||
/** | ||
* Resets the internal state of this server to `Unknown` by simulating an empty ismaster | ||
* | ||
* @private | ||
* @param {Server} server | ||
* @param {MongoError} error The error that caused the state reset | ||
* @param {object} [options] Optional settings | ||
* @param {boolean} [options.clearPool=false] Pool should be cleared out on state reset | ||
*/ | ||
function resetServerState(server, error, options) { | ||
options = Object.assign({}, { clearPool: false }, options); | ||
function resetState() { | ||
server.emit( | ||
'descriptionReceived', | ||
new ServerDescription(server.description.address, null, { error }) | ||
); | ||
} | ||
if (options.clearPool && server.pool) { | ||
server.pool.reset(() => resetState()); | ||
return; | ||
} | ||
resetState(); | ||
} | ||
function translateReadPreference(options) { | ||
if (options.readPreference == null) { | ||
return; | ||
} | ||
let r = options.readPreference; | ||
if (typeof r === 'string') { | ||
options.readPreference = new ReadPreference(r); | ||
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') { | ||
const mode = r.mode || r.preference; | ||
if (mode && typeof mode === 'string') { | ||
options.readPreference = new ReadPreference(mode, r.tags, { | ||
maxStalenessSeconds: r.maxStalenessSeconds | ||
}); | ||
} | ||
}); | ||
} else if (!(r instanceof ReadPreference)) { | ||
throw new TypeError('Invalid read preference: ' + r); | ||
} | ||
return options; | ||
} | ||
@@ -702,2 +1069,23 @@ | ||
/** | ||
* An event emitted indicating a command was started, if command monitoring is enabled | ||
* | ||
* @event Topology#commandStarted | ||
* @type {object} | ||
*/ | ||
/** | ||
* An event emitted indicating a command succeeded, if command monitoring is enabled | ||
* | ||
* @event Topology#commandSucceeded | ||
* @type {object} | ||
*/ | ||
/** | ||
* An event emitted indicating a command failed, if command monitoring is enabled | ||
* | ||
* @event Topology#commandFailed | ||
* @type {object} | ||
*/ | ||
module.exports = Topology; |
@@ -14,2 +14,3 @@ 'use strict'; | ||
const TxnState = require('./transactions').TxnState; | ||
const isPromiseLike = require('./utils').isPromiseLike; | ||
@@ -247,4 +248,119 @@ function assertAlive(session, callback) { | ||
} | ||
/** | ||
* A user provided function to be run within a transaction | ||
* | ||
* @callback WithTransactionCallback | ||
* @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda. | ||
* @returns {Promise} The resulting Promise of operations run within this transaction | ||
*/ | ||
/** | ||
* Runs a provided lambda within a transaction, retrying either the commit operation | ||
* or entire transaction as needed (and when the error permits) to better ensure that | ||
* the transaction can complete successfully. | ||
* | ||
* IMPORTANT: This method requires the user to return a Promise, all lambdas that do not | ||
* return a Promise will result in undefined behavior. | ||
* | ||
* @param {WithTransactionCallback} fn | ||
* @param {TransactionOptions} [options] Optional settings for the transaction | ||
*/ | ||
withTransaction(fn, options) { | ||
const startTime = Date.now(); | ||
return attemptTransaction(this, startTime, fn, options); | ||
} | ||
} | ||
const MAX_WITH_TRANSACTION_TIMEOUT = 120000; | ||
const UNSATISFIABLE_WRITE_CONCERN_CODE = 100; | ||
const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79; | ||
const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([ | ||
'CannotSatisfyWriteConcern', | ||
'UnknownReplWriteConcern', | ||
'UnsatisfiableWriteConcern' | ||
]); | ||
function hasNotTimedOut(startTime, max) { | ||
return Date.now() - startTime < max; | ||
} | ||
function isUnknownTransactionCommitResult(err) { | ||
return ( | ||
!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) && | ||
err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE && | ||
err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE | ||
); | ||
} | ||
function attemptTransactionCommit(session, startTime, fn, options) { | ||
return session.commitTransaction().catch(err => { | ||
if (err instanceof MongoError && hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)) { | ||
if (err.hasErrorLabel('UnknownTransactionCommitResult')) { | ||
return attemptTransactionCommit(session, startTime, fn, options); | ||
} | ||
if (err.hasErrorLabel('TransientTransactionError')) { | ||
return attemptTransaction(session, startTime, fn, options); | ||
} | ||
} | ||
throw err; | ||
}); | ||
} | ||
const USER_EXPLICIT_TXN_END_STATES = new Set([ | ||
TxnState.NO_TRANSACTION, | ||
TxnState.TRANSACTION_COMMITTED, | ||
TxnState.TRANSACTION_ABORTED | ||
]); | ||
function userExplicitlyEndedTransaction(session) { | ||
return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state); | ||
} | ||
function attemptTransaction(session, startTime, fn, options) { | ||
session.startTransaction(options); | ||
let promise; | ||
try { | ||
promise = fn(session); | ||
} catch (err) { | ||
promise = Promise.reject(err); | ||
} | ||
if (!isPromiseLike(promise)) { | ||
session.abortTransaction(); | ||
throw new TypeError('Function provided to `withTransaction` must return a Promise'); | ||
} | ||
return promise | ||
.then(() => { | ||
if (userExplicitlyEndedTransaction(session)) { | ||
return; | ||
} | ||
return attemptTransactionCommit(session, startTime, fn, options); | ||
}) | ||
.catch(err => { | ||
function maybeRetryOrThrow(err) { | ||
if ( | ||
err instanceof MongoError && | ||
err.hasErrorLabel('TransientTransactionError') && | ||
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) | ||
) { | ||
return attemptTransaction(session, startTime, fn, options); | ||
} | ||
throw err; | ||
} | ||
if (session.transaction.isActive) { | ||
return session.abortTransaction().then(() => maybeRetryOrThrow(err)); | ||
} | ||
return maybeRetryOrThrow(err); | ||
}); | ||
} | ||
function endTransaction(session, commandName, callback) { | ||
@@ -305,8 +421,17 @@ if (!assertAlive(session, callback)) { | ||
// apply a writeConcern if specified | ||
let writeConcern; | ||
if (session.transaction.options.writeConcern) { | ||
Object.assign(command, { writeConcern: session.transaction.options.writeConcern }); | ||
writeConcern = Object.assign({}, session.transaction.options.writeConcern); | ||
} else if (session.clientOptions && session.clientOptions.w) { | ||
Object.assign(command, { writeConcern: { w: session.clientOptions.w } }); | ||
writeConcern = { w: session.clientOptions.w }; | ||
} | ||
if (txnState === TxnState.TRANSACTION_COMMITTED) { | ||
writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' }); | ||
} | ||
if (writeConcern) { | ||
Object.assign(command, { writeConcern }); | ||
} | ||
function commandHandler(e, r) { | ||
@@ -331,3 +456,8 @@ if (commandName === 'commitTransaction') { | ||
e.errorLabels.push('UnknownTransactionCommitResult'); | ||
if (isUnknownTransactionCommitResult(e)) { | ||
e.errorLabels.push('UnknownTransactionCommitResult'); | ||
// per txns spec, must unpin session in this case | ||
session.transaction.unpinServer(); | ||
} | ||
} | ||
@@ -346,5 +476,19 @@ } else { | ||
if (session.transaction.recoveryToken) { | ||
command.recoveryToken = session.transaction.recoveryToken; | ||
} | ||
// send the command | ||
session.topology.command('admin.$cmd', command, { session }, (err, reply) => { | ||
if (err && isRetryableError(err)) { | ||
// SPEC-1185: apply majority write concern when retrying commitTransaction | ||
if (command.commitTransaction) { | ||
// per txns spec, must unpin session in this case | ||
session.transaction.unpinServer(); | ||
command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, { | ||
w: 'majority' | ||
}); | ||
} | ||
return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) => | ||
@@ -351,0 +495,0 @@ commandHandler(transactionError(_err), _reply) |
@@ -40,4 +40,2 @@ 'use strict'; | ||
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders; | ||
// | ||
@@ -95,3 +93,2 @@ // States | ||
* @param {number} [options.socketTimeout=0] TCP Socket timeout setting | ||
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed | ||
* @param {boolean} [options.ssl=false] Use SSL for connection | ||
@@ -175,5 +172,3 @@ * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. | ||
// Client info | ||
clientInfo: createClientInfo(options), | ||
// Authentication context | ||
authenticationContexts: [] | ||
clientInfo: createClientInfo(options) | ||
}; | ||
@@ -200,5 +195,2 @@ | ||
// All the authProviders | ||
this.authProviders = options.authProviders || defaultAuthProviders(this.s.bson); | ||
// Disconnected state | ||
@@ -213,4 +205,2 @@ this.state = DISCONNECTED; | ||
this.disconnectedProxies = []; | ||
// Are we authenticating | ||
this.authenticating = false; | ||
// Index of proxy to run operations against | ||
@@ -280,4 +270,2 @@ this.index = 0; | ||
* Initiate server connect | ||
* @method | ||
* @param {array} [options.auth=null] Array of auth options to apply on connect | ||
*/ | ||
@@ -296,3 +284,2 @@ Mongos.prototype.connect = function(options) { | ||
Object.assign({}, self.s.options, x, options, { | ||
authProviders: self.authProviders, | ||
reconnect: false, | ||
@@ -316,2 +303,12 @@ monitoring: false, | ||
/** | ||
* Authenticate the topology. | ||
* @method | ||
* @param {MongoCredentials} credentials The credentials for authentication we are using | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Mongos.prototype.auth = function(credentials, callback) { | ||
callback(null, null); | ||
}; | ||
function handleEvent(self) { | ||
@@ -349,56 +346,53 @@ return function() { | ||
if (event === 'connect') { | ||
// Do we have authentication contexts that need to be applied | ||
applyAuthenticationContexts(self, _this, function() { | ||
// Get last known ismaster | ||
self.ismaster = _this.lastIsMaster(); | ||
// Get last known ismaster | ||
self.ismaster = _this.lastIsMaster(); | ||
// Is this not a proxy, remove t | ||
if (self.ismaster.msg === 'isdbgrid') { | ||
// Add to the connectd list | ||
for (var i = 0; i < self.connectedProxies.length; i++) { | ||
if (self.connectedProxies[i].name === _this.name) { | ||
// Move from connectingProxies | ||
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _this); | ||
// Emit the initial topology | ||
emitTopologyDescriptionChanged(self); | ||
_this.destroy(); | ||
return self.emit('failed', _this); | ||
} | ||
// Is this not a proxy, remove t | ||
if (self.ismaster.msg === 'isdbgrid') { | ||
// Add to the connectd list | ||
for (let i = 0; i < self.connectedProxies.length; i++) { | ||
if (self.connectedProxies[i].name === _this.name) { | ||
// Move from connectingProxies | ||
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _this); | ||
// Emit the initial topology | ||
emitTopologyDescriptionChanged(self); | ||
_this.destroy(); | ||
return self.emit('failed', _this); | ||
} | ||
} | ||
// Remove the handlers | ||
for (i = 0; i < handlers.length; i++) { | ||
_this.removeAllListeners(handlers[i]); | ||
} | ||
// Remove the handlers | ||
for (let i = 0; i < handlers.length; i++) { | ||
_this.removeAllListeners(handlers[i]); | ||
} | ||
// Add stable state handlers | ||
_this.on('error', handleEvent(self, 'error')); | ||
_this.on('close', handleEvent(self, 'close')); | ||
_this.on('timeout', handleEvent(self, 'timeout')); | ||
_this.on('parseError', handleEvent(self, 'parseError')); | ||
// Add stable state handlers | ||
_this.on('error', handleEvent(self, 'error')); | ||
_this.on('close', handleEvent(self, 'close')); | ||
_this.on('timeout', handleEvent(self, 'timeout')); | ||
_this.on('parseError', handleEvent(self, 'parseError')); | ||
// Move from connecting proxies connected | ||
moveServerFrom(self.connectingProxies, self.connectedProxies, _this); | ||
// Emit the joined event | ||
self.emit('joined', 'mongos', _this); | ||
} else { | ||
// Print warning if we did not find a mongos proxy | ||
if (self.s.logger.isWarn()) { | ||
var message = 'expected mongos proxy, but found replicaset member mongod for server %s'; | ||
// We have a standalone server | ||
if (!self.ismaster.hosts) { | ||
message = 'expected mongos proxy, but found standalone mongod for server %s'; | ||
} | ||
self.s.logger.warn(f(message, _this.name)); | ||
// Move from connecting proxies connected | ||
moveServerFrom(self.connectingProxies, self.connectedProxies, _this); | ||
// Emit the joined event | ||
self.emit('joined', 'mongos', _this); | ||
} else { | ||
// Print warning if we did not find a mongos proxy | ||
if (self.s.logger.isWarn()) { | ||
var message = 'expected mongos proxy, but found replicaset member mongod for server %s'; | ||
// We have a standalone server | ||
if (!self.ismaster.hosts) { | ||
message = 'expected mongos proxy, but found standalone mongod for server %s'; | ||
} | ||
// This is not a mongos proxy, remove it completely | ||
removeProxyFrom(self.connectingProxies, _this); | ||
// Emit the left event | ||
self.emit('left', 'server', _this); | ||
// Emit failed event | ||
self.emit('failed', _this); | ||
self.s.logger.warn(f(message, _this.name)); | ||
} | ||
}); | ||
// This is not a mongos proxy, remove it completely | ||
removeProxyFrom(self.connectingProxies, _this); | ||
// Emit the left event | ||
self.emit('left', 'server', _this); | ||
// Emit failed event | ||
self.emit('failed', _this); | ||
} | ||
} else { | ||
@@ -482,3 +476,14 @@ moveServerFrom(self.connectingProxies, self.disconnectedProxies, this); | ||
function pickProxy(self) { | ||
function pickProxy(self, session) { | ||
// TODO: Destructure :) | ||
const transaction = session && session.transaction; | ||
if (transaction && transaction.server) { | ||
if (transaction.server.isConnected()) { | ||
return transaction.server; | ||
} else { | ||
transaction.unpinServer(); | ||
} | ||
} | ||
// Get the currently connected Proxies | ||
@@ -507,11 +512,18 @@ var connectedProxies = self.connectedProxies.slice(0); | ||
let proxy; | ||
// We have no connectedProxies pick first of the connected ones | ||
if (connectedProxies.length === 0) { | ||
return self.connectedProxies[0]; | ||
proxy = self.connectedProxies[0]; | ||
} else { | ||
// Get proxy | ||
proxy = connectedProxies[self.index % connectedProxies.length]; | ||
// Update the index | ||
self.index = (self.index + 1) % connectedProxies.length; | ||
} | ||
// Get proxy | ||
var proxy = connectedProxies[self.index % connectedProxies.length]; | ||
// Update the index | ||
self.index = (self.index + 1) % connectedProxies.length; | ||
if (transaction && transaction.isActive && proxy && proxy.isConnected()) { | ||
transaction.pinServer(proxy); | ||
} | ||
// Return the proxy | ||
@@ -561,29 +573,26 @@ return proxy; | ||
if (event === 'connect' && !self.authenticating) { | ||
// Do we have authentication contexts that need to be applied | ||
applyAuthenticationContexts(self, _self, function() { | ||
// Destroyed | ||
if (self.state === DESTROYED || self.state === UNREFERENCED) { | ||
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); | ||
return _self.destroy(); | ||
} | ||
if (event === 'connect') { | ||
// Destroyed | ||
if (self.state === DESTROYED || self.state === UNREFERENCED) { | ||
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); | ||
return _self.destroy(); | ||
} | ||
// Remove the handlers | ||
for (var i = 0; i < handlers.length; i++) { | ||
_self.removeAllListeners(handlers[i]); | ||
} | ||
// Remove the handlers | ||
for (var i = 0; i < handlers.length; i++) { | ||
_self.removeAllListeners(handlers[i]); | ||
} | ||
// Add stable state handlers | ||
_self.on('error', handleEvent(self, 'error')); | ||
_self.on('close', handleEvent(self, 'close')); | ||
_self.on('timeout', handleEvent(self, 'timeout')); | ||
_self.on('parseError', handleEvent(self, 'parseError')); | ||
// Add stable state handlers | ||
_self.on('error', handleEvent(self, 'error')); | ||
_self.on('close', handleEvent(self, 'close')); | ||
_self.on('timeout', handleEvent(self, 'timeout')); | ||
_self.on('parseError', handleEvent(self, 'parseError')); | ||
// Move to the connected servers | ||
moveServerFrom(self.connectingProxies, self.connectedProxies, _self); | ||
// Emit topology Change | ||
emitTopologyDescriptionChanged(self); | ||
// Emit joined event | ||
self.emit('joined', 'mongos', _self); | ||
}); | ||
// Move to the connected servers | ||
moveServerFrom(self.connectingProxies, self.connectedProxies, _self); | ||
// Emit topology Change | ||
emitTopologyDescriptionChanged(self); | ||
// Emit joined event | ||
self.emit('joined', 'mongos', _self); | ||
} else { | ||
@@ -620,3 +629,2 @@ // Move from connectingProxies | ||
port: parseInt(_server.name.split(':')[1], 10), | ||
authProviders: self.authProviders, | ||
reconnect: false, | ||
@@ -663,31 +671,2 @@ monitoring: false, | ||
function applyAuthenticationContexts(self, server, callback) { | ||
if (self.s.authenticationContexts.length === 0) { | ||
return callback(); | ||
} | ||
// Copy contexts to ensure no modificiation in the middle of | ||
// auth process. | ||
var authContexts = self.s.authenticationContexts.slice(0); | ||
// Apply one of the contexts | ||
function applyAuth(authContexts, server, callback) { | ||
if (authContexts.length === 0) return callback(); | ||
// Get the first auth context | ||
var authContext = authContexts.shift(); | ||
// Copy the params | ||
var customAuthContext = authContext.slice(0); | ||
// Push our callback handler | ||
customAuthContext.push(function(/* err */) { | ||
applyAuth(authContexts, server, callback); | ||
}); | ||
// Attempt authentication | ||
server.auth.apply(server, customAuthContext); | ||
} | ||
// Apply all auth contexts | ||
applyAuth(authContexts, server, callback); | ||
} | ||
function topologyMonitor(self, options) { | ||
@@ -854,4 +833,2 @@ options = options || {}; | ||
if (this.haTimeoutId) clearTimeout(this.haTimeoutId); | ||
// Clear out authentication contexts | ||
this.s.authenticationContexts = []; | ||
@@ -899,24 +876,27 @@ // Destroy all connecting servers | ||
// Execute write operation | ||
var executeWriteOperation = function(self, op, ns, ops, options, callback) { | ||
function executeWriteOperation(args, options, callback) { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
options = options || {}; | ||
// TODO: once we drop Node 4, use destructuring either here or in arguments. | ||
const self = args.self; | ||
const op = args.op; | ||
const ns = args.ns; | ||
const ops = args.ops; | ||
// Pick a server | ||
let server = pickProxy(self); | ||
let server = pickProxy(self, options.session); | ||
// No server found error out | ||
if (!server) return callback(new MongoError('no mongos proxy available')); | ||
if (!options.retryWrites || !options.session || !isRetryableWritesSupported(self)) { | ||
// Execute the command | ||
return server[op](ns, ops, options, callback); | ||
} | ||
const willRetryWrite = | ||
!args.retrying && | ||
!!options.retryWrites && | ||
options.session && | ||
isRetryableWritesSupported(self) && | ||
!options.session.inTransaction(); | ||
// increment and assign txnNumber | ||
options.willRetryWrite = true; | ||
options.session.incrementTransactionNumber(); | ||
server[op](ns, ops, options, (err, result) => { | ||
const handler = (err, result) => { | ||
if (!err) return callback(null, result); | ||
if (!isRetryableError(err)) { | ||
if (!isRetryableError(err) || !willRetryWrite) { | ||
return callback(err); | ||
@@ -926,14 +906,27 @@ } | ||
// Pick another server | ||
server = pickProxy(self); | ||
server = pickProxy(self, options.session); | ||
// No server found error out with original error | ||
if (!server || !isRetryableWritesSupported(server)) { | ||
if (!server) { | ||
return callback(err); | ||
} | ||
// rerun the operation | ||
server[op](ns, ops, options, callback); | ||
}); | ||
}; | ||
const newArgs = Object.assign({}, args, { retrying: true }); | ||
return executeWriteOperation(newArgs, options, callback); | ||
}; | ||
if (callback.operationId) { | ||
handler.operationId = callback.operationId; | ||
} | ||
// increment and assign txnNumber | ||
if (willRetryWrite) { | ||
options.session.incrementTransactionNumber(); | ||
options.willRetryWrite = willRetryWrite; | ||
} | ||
// rerun the operation | ||
server[op](ns, ops, options, handler); | ||
} | ||
/** | ||
@@ -970,3 +963,3 @@ * Insert one or more documents | ||
// Execute write operation | ||
executeWriteOperation(this, 'insert', ns, ops, options, callback); | ||
executeWriteOperation({ self: this, op: 'insert', ns, ops }, options, callback); | ||
}; | ||
@@ -1005,3 +998,3 @@ | ||
// Execute write operation | ||
executeWriteOperation(this, 'update', ns, ops, options, callback); | ||
executeWriteOperation({ self: this, op: 'update', ns, ops }, options, callback); | ||
}; | ||
@@ -1040,3 +1033,3 @@ | ||
// Execute write operation | ||
executeWriteOperation(this, 'remove', ns, ops, options, callback); | ||
executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback); | ||
}; | ||
@@ -1071,3 +1064,3 @@ | ||
// Pick a proxy | ||
var server = pickProxy(self); | ||
var server = pickProxy(self, options.session); | ||
@@ -1148,160 +1141,2 @@ // Topology is not connected, save the call in the provided store to be | ||
/** | ||
* Authenticate using a specified mechanism | ||
* @method | ||
* @param {string} mechanism The Auth mechanism we are invoking | ||
* @param {string} db The db we are invoking the mechanism against | ||
* @param {...object} param Parameters for the specific mechanism | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Mongos.prototype.auth = function(mechanism, db) { | ||
var allArgs = Array.prototype.slice.call(arguments, 0).slice(0); | ||
var self = this; | ||
var args = Array.prototype.slice.call(arguments, 2); | ||
var callback = args.pop(); | ||
var currentContextIndex = 0; | ||
// If we don't have the mechanism fail | ||
if (this.authProviders[mechanism] == null && mechanism !== 'default') { | ||
return callback(new MongoError(f('auth provider %s does not exist', mechanism))); | ||
} | ||
// Are we already authenticating, throw | ||
if (this.authenticating) { | ||
return callback(new MongoError('authentication or logout allready in process')); | ||
} | ||
// Topology is not connected, save the call in the provided store to be | ||
// Executed at some point when the handler deems it's reconnected | ||
if (!self.isConnected() && self.s.disconnectHandler != null) { | ||
return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback); | ||
} | ||
// Set to authenticating | ||
this.authenticating = true; | ||
// All errors | ||
var errors = []; | ||
// Get all the servers | ||
var servers = this.connectedProxies.slice(0); | ||
// No servers return | ||
if (servers.length === 0) { | ||
this.authenticating = false; | ||
callback(null, true); | ||
} | ||
// Authenticate | ||
function auth(server) { | ||
// Arguments without a callback | ||
var argsWithoutCallback = [mechanism, db].concat(args.slice(0)); | ||
// Create arguments | ||
var finalArguments = argsWithoutCallback.concat([ | ||
function(err) { | ||
count = count - 1; | ||
// Save all the errors | ||
if (err) errors.push({ name: server.name, err: err }); | ||
// We are done | ||
if (count === 0) { | ||
// Auth is done | ||
self.authenticating = false; | ||
// Return the auth error | ||
if (errors.length) { | ||
// Remove the entry from the stored authentication contexts | ||
self.s.authenticationContexts.splice(currentContextIndex, 0); | ||
// Return error | ||
return callback( | ||
new MongoError({ | ||
message: 'authentication fail', | ||
errors: errors | ||
}), | ||
false | ||
); | ||
} | ||
// Successfully authenticated session | ||
callback(null, self); | ||
} | ||
} | ||
]); | ||
// Execute the auth only against non arbiter servers | ||
if (!server.lastIsMaster().arbiterOnly) { | ||
server.auth.apply(server, finalArguments); | ||
} | ||
} | ||
// Save current context index | ||
currentContextIndex = this.s.authenticationContexts.length; | ||
// Store the auth context and return the last index | ||
this.s.authenticationContexts.push([mechanism, db].concat(args.slice(0))); | ||
// Get total count | ||
var count = servers.length; | ||
// Authenticate against all servers | ||
while (servers.length > 0) { | ||
auth(servers.shift()); | ||
} | ||
}; | ||
/** | ||
* Logout from a database | ||
* @method | ||
* @param {string} db The db we are logging out from | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Mongos.prototype.logout = function(dbName, callback) { | ||
var self = this; | ||
// Are we authenticating or logging out, throw | ||
if (this.authenticating) { | ||
throw new MongoError('authentication or logout allready in process'); | ||
} | ||
// Ensure no new members are processed while logging out | ||
this.authenticating = true; | ||
// Remove from all auth providers (avoid any reaplication of the auth details) | ||
var providers = Object.keys(this.authProviders); | ||
for (var i = 0; i < providers.length; i++) { | ||
this.authProviders[providers[i]].logout(dbName); | ||
} | ||
// Now logout all the servers | ||
var servers = this.connectedProxies.slice(0); | ||
var count = servers.length; | ||
if (count === 0) return callback(); | ||
var errors = []; | ||
function logoutServer(_server, cb) { | ||
_server.logout(dbName, function(err) { | ||
if (err) errors.push({ name: _server.name, err: err }); | ||
cb(); | ||
}); | ||
} | ||
// Execute logout on all server instances | ||
for (i = 0; i < servers.length; i++) { | ||
logoutServer(servers[i], function() { | ||
count = count - 1; | ||
if (count === 0) { | ||
// Do not block new operations | ||
self.authenticating = false; | ||
// If we have one or more errors | ||
if (errors.length) | ||
return callback( | ||
new MongoError({ | ||
message: f('logout failed against db %s', dbName), | ||
errors: errors | ||
}), | ||
false | ||
); | ||
// No errors | ||
callback(); | ||
} | ||
}); | ||
} | ||
}; | ||
/** | ||
* Selects a server | ||
@@ -1311,3 +1146,4 @@ * | ||
* @param {function} selector Unused | ||
* @param {ReadPreference} [options.readPreference] Specify read preference if command supports it | ||
* @param {ReadPreference} [options.readPreference] Unused | ||
* @param {ClientSession} [options.session] Specify a session if it is being used | ||
* @param {function} callback | ||
@@ -1322,3 +1158,3 @@ */ | ||
const server = pickProxy(this); | ||
const server = pickProxy(this, options.session); | ||
if (this.s.debug) this.emit('pickedServer', null, server); | ||
@@ -1325,0 +1161,0 @@ callback(null, server); |
@@ -290,16 +290,14 @@ 'use strict'; | ||
if (ismaster && ismaster.msg === 'isdbgrid') { | ||
if (this.primary && this.primary.name === serverName) { | ||
this.primary = null; | ||
this.topologyType = TopologyType.ReplicaSetNoPrimary; | ||
} | ||
return false; | ||
} | ||
// A RSOther instance | ||
if ( | ||
(ismaster.setName && ismaster.hidden) || | ||
(ismaster.setName && | ||
!ismaster.ismaster && | ||
!ismaster.secondary && | ||
!ismaster.arbiterOnly && | ||
!ismaster.passive) | ||
) { | ||
// A RSGhost instance | ||
if (ismaster.isreplicaset) { | ||
self.set[serverName] = { | ||
type: ServerType.RSOther, | ||
type: ServerType.RSGhost, | ||
setVersion: null, | ||
@@ -309,2 +307,7 @@ electionId: null, | ||
}; | ||
if (this.primary && this.primary.name === serverName) { | ||
this.primary = null; | ||
} | ||
// Set the topology | ||
@@ -315,12 +318,21 @@ this.topologyType = this.primary | ||
if (ismaster.setName) this.setName = ismaster.setName; | ||
// Set the topology | ||
return false; | ||
} | ||
// A RSGhost instance | ||
if (ismaster.isreplicaset) { | ||
// A RSOther instance | ||
if ( | ||
(ismaster.setName && ismaster.hidden) || | ||
(ismaster.setName && | ||
!ismaster.ismaster && | ||
!ismaster.secondary && | ||
!ismaster.arbiterOnly && | ||
!ismaster.passive) | ||
) { | ||
self.set[serverName] = { | ||
type: ServerType.RSGhost, | ||
type: ServerType.RSOther, | ||
setVersion: null, | ||
electionId: null, | ||
setName: null | ||
setName: ismaster.setName | ||
}; | ||
@@ -333,4 +345,2 @@ | ||
if (ismaster.setName) this.setName = ismaster.setName; | ||
// Set the topology | ||
return false; | ||
@@ -337,0 +347,0 @@ } |
@@ -21,7 +21,4 @@ 'use strict'; | ||
const isRetryableError = require('../error').isRetryableError; | ||
const BSON = retrieveBSON(); | ||
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders; | ||
var BSON = retrieveBSON(); | ||
// | ||
@@ -81,3 +78,2 @@ // States | ||
* @param {number} [options.socketTimeout=0] TCP Socket timeout setting | ||
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed | ||
* @param {boolean} [options.ssl=false] Use SSL for connection | ||
@@ -196,5 +192,3 @@ * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. | ||
// Client info | ||
clientInfo: createClientInfo(options), | ||
// Authentication context | ||
authenticationContexts: [] | ||
clientInfo: createClientInfo(options) | ||
}; | ||
@@ -223,5 +217,2 @@ | ||
// All the authProviders | ||
this.authProviders = options.authProviders || defaultAuthProviders(this.s.bson); | ||
// Add forwarding of events from state handler | ||
@@ -245,4 +236,2 @@ var types = ['joined', 'left']; | ||
this.haTimeoutId = null; | ||
// Are we authenticating | ||
this.authenticating = false; | ||
// Last ismaster | ||
@@ -309,3 +298,3 @@ this.ismaster = null; | ||
if (event === 'connect' && !self.authenticating) { | ||
if (event === 'connect') { | ||
// Destroyed | ||
@@ -316,40 +305,30 @@ if (self.state === DESTROYED || self.state === UNREFERENCED) { | ||
// Do we have authentication contexts that need to be applied | ||
applyAuthenticationContexts(self, _self, function() { | ||
// Destroy the instance | ||
if (self.state === DESTROYED || self.state === UNREFERENCED) { | ||
return _self.destroy({ force: true }); | ||
// Update the state | ||
var result = self.s.replicaSetState.update(_self); | ||
// Update the state with the new server | ||
if (result) { | ||
// Primary lastIsMaster store it | ||
if (_self.lastIsMaster() && _self.lastIsMaster().ismaster) { | ||
self.ismaster = _self.lastIsMaster(); | ||
} | ||
// Update the state | ||
var result = self.s.replicaSetState.update(_self); | ||
// Update the state with the new server | ||
if (result) { | ||
// Primary lastIsMaster store it | ||
if (_self.lastIsMaster() && _self.lastIsMaster().ismaster) { | ||
self.ismaster = _self.lastIsMaster(); | ||
} | ||
// Remove the handlers | ||
for (let i = 0; i < handlers.length; i++) { | ||
_self.removeAllListeners(handlers[i]); | ||
} | ||
// Remove the handlers | ||
for (var i = 0; i < handlers.length; i++) { | ||
_self.removeAllListeners(handlers[i]); | ||
} | ||
// Add stable state handlers | ||
_self.on('error', handleEvent(self, 'error')); | ||
_self.on('close', handleEvent(self, 'close')); | ||
_self.on('timeout', handleEvent(self, 'timeout')); | ||
_self.on('parseError', handleEvent(self, 'parseError')); | ||
// Add stable state handlers | ||
_self.on('error', handleEvent(self, 'error')); | ||
_self.on('close', handleEvent(self, 'close')); | ||
_self.on('timeout', handleEvent(self, 'timeout')); | ||
_self.on('parseError', handleEvent(self, 'parseError')); | ||
// Enalbe the monitoring of the new server | ||
monitorServer(_self.lastIsMaster().me, self, {}); | ||
// Enalbe the monitoring of the new server | ||
monitorServer(_self.lastIsMaster().me, self, {}); | ||
// Rexecute any stalled operation | ||
rexecuteOperations(self); | ||
} else { | ||
_self.destroy({ force: true }); | ||
} | ||
}); | ||
} else if (event === 'connect' && self.authenticating) { | ||
this.destroy({ force: true }); | ||
// Rexecute any stalled operation | ||
rexecuteOperations(self); | ||
} else { | ||
_self.destroy({ force: true }); | ||
} | ||
} else if (event === 'error') { | ||
@@ -385,3 +364,2 @@ error = err; | ||
port: parseInt(_server.split(':')[1], 10), | ||
authProviders: self.authProviders, | ||
reconnect: false, | ||
@@ -737,36 +715,2 @@ monitoring: false, | ||
function applyAuthenticationContexts(self, server, callback) { | ||
if (self.s.authenticationContexts.length === 0) { | ||
return callback(); | ||
} | ||
// Do not apply any auth contexts if it's an arbiter | ||
if (server.lastIsMaster() && server.lastIsMaster().arbiterOnly) { | ||
return callback(); | ||
} | ||
// Copy contexts to ensure no modificiation in the middle of | ||
// auth process. | ||
var authContexts = self.s.authenticationContexts.slice(0); | ||
// Apply one of the contexts | ||
function applyAuth(authContexts, server, callback) { | ||
if (authContexts.length === 0) return callback(); | ||
// Get the first auth context | ||
var authContext = authContexts.shift(); | ||
// Copy the params | ||
var customAuthContext = authContext.slice(0); | ||
// Push our callback handler | ||
customAuthContext.push(function(/* err */) { | ||
applyAuth(authContexts, server, callback); | ||
}); | ||
// Attempt authentication | ||
server.auth.apply(server, customAuthContext); | ||
} | ||
// Apply all auth contexts | ||
applyAuth(authContexts, server, callback); | ||
} | ||
function shouldTriggerConnect(self) { | ||
@@ -810,63 +754,55 @@ const isConnecting = self.state === CONNECTING; | ||
if (event === 'connect') { | ||
// Do we have authentication contexts that need to be applied | ||
applyAuthenticationContexts(self, _this, function() { | ||
// Destroy the instance | ||
if (self.state === DESTROYED || self.state === UNREFERENCED) { | ||
return _this.destroy({ force: true }); | ||
// Update the state | ||
var result = self.s.replicaSetState.update(_this); | ||
if (result === true) { | ||
// Primary lastIsMaster store it | ||
if (_this.lastIsMaster() && _this.lastIsMaster().ismaster) { | ||
self.ismaster = _this.lastIsMaster(); | ||
} | ||
// Update the state | ||
var result = self.s.replicaSetState.update(_this); | ||
if (result === true) { | ||
// Primary lastIsMaster store it | ||
if (_this.lastIsMaster() && _this.lastIsMaster().ismaster) { | ||
self.ismaster = _this.lastIsMaster(); | ||
} | ||
// Debug log | ||
if (self.s.logger.isDebug()) { | ||
self.s.logger.debug( | ||
f( | ||
'handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', | ||
event, | ||
_this.name, | ||
self.id, | ||
JSON.stringify(self.s.replicaSetState.set) | ||
) | ||
); | ||
} | ||
// Debug log | ||
if (self.s.logger.isDebug()) { | ||
self.s.logger.debug( | ||
f( | ||
'handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', | ||
event, | ||
_this.name, | ||
self.id, | ||
JSON.stringify(self.s.replicaSetState.set) | ||
) | ||
); | ||
} | ||
// Remove the handlers | ||
for (let i = 0; i < handlers.length; i++) { | ||
_this.removeAllListeners(handlers[i]); | ||
} | ||
// Remove the handlers | ||
for (var i = 0; i < handlers.length; i++) { | ||
_this.removeAllListeners(handlers[i]); | ||
} | ||
// Add stable state handlers | ||
_this.on('error', handleEvent(self, 'error')); | ||
_this.on('close', handleEvent(self, 'close')); | ||
_this.on('timeout', handleEvent(self, 'timeout')); | ||
_this.on('parseError', handleEvent(self, 'parseError')); | ||
// Add stable state handlers | ||
_this.on('error', handleEvent(self, 'error')); | ||
_this.on('close', handleEvent(self, 'close')); | ||
_this.on('timeout', handleEvent(self, 'timeout')); | ||
_this.on('parseError', handleEvent(self, 'parseError')); | ||
// Do we have a primary or primaryAndSecondary | ||
if (shouldTriggerConnect(self)) { | ||
// We are connected | ||
self.state = CONNECTED; | ||
// Do we have a primary or primaryAndSecondary | ||
if (shouldTriggerConnect(self)) { | ||
// We are connected | ||
self.state = CONNECTED; | ||
// Set initial connect state | ||
self.initialConnectState.connect = true; | ||
// Emit connect event | ||
process.nextTick(function() { | ||
self.emit('connect', self); | ||
}); | ||
// Set initial connect state | ||
self.initialConnectState.connect = true; | ||
// Emit connect event | ||
process.nextTick(function() { | ||
self.emit('connect', self); | ||
}); | ||
topologyMonitor(self, {}); | ||
} | ||
} else if (result instanceof MongoError) { | ||
_this.destroy({ force: true }); | ||
self.destroy({ force: true }); | ||
return self.emit('error', result); | ||
} else { | ||
_this.destroy({ force: true }); | ||
topologyMonitor(self, {}); | ||
} | ||
}); | ||
} else if (result instanceof MongoError) { | ||
_this.destroy({ force: true }); | ||
self.destroy({ force: true }); | ||
return self.emit('error', result); | ||
} else { | ||
_this.destroy({ force: true }); | ||
} | ||
} else { | ||
@@ -966,4 +902,2 @@ // Emit failure to connect | ||
* Initiate server connect | ||
* @method | ||
* @param {array} [options.auth=null] Array of auth options to apply on connect | ||
*/ | ||
@@ -982,3 +916,2 @@ ReplSet.prototype.connect = function(options) { | ||
Object.assign({}, self.s.options, x, options, { | ||
authProviders: self.authProviders, | ||
reconnect: false, | ||
@@ -1016,2 +949,12 @@ monitoring: false, | ||
/** | ||
* Authenticate the topology. | ||
* @method | ||
* @param {MongoCredentials} credentials The credentials for authentication we are using | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
ReplSet.prototype.auth = function(credentials, callback) { | ||
callback(null, null); | ||
}; | ||
/** | ||
* Destroy the server connection | ||
@@ -1029,4 +972,2 @@ * @param {boolean} [options.force=false] Force destroy the pool | ||
this.s.replicaSetState.destroy(options); | ||
// Clear out authentication contexts | ||
this.s.authenticationContexts = []; | ||
@@ -1110,6 +1051,2 @@ // Destroy all connecting servers | ||
// If we are authenticating signal not connected | ||
// To avoid interleaving of operations | ||
if (this.authenticating) return false; | ||
// If we specified a read preference check if we are connected to something | ||
@@ -1155,2 +1092,3 @@ // than can satisfy this | ||
* @param {ReadPreference} [options.readPreference] Specify read preference if command supports it | ||
* @param {ClientSession} [options.session] Unused | ||
* @param {function} callback | ||
@@ -1403,177 +1341,2 @@ */ | ||
/** | ||
* Authenticate using a specified mechanism | ||
* @method | ||
* @param {string} mechanism The Auth mechanism we are invoking | ||
* @param {string} db The db we are invoking the mechanism against | ||
* @param {...object} param Parameters for the specific mechanism | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
ReplSet.prototype.auth = function(mechanism, db) { | ||
var allArgs = Array.prototype.slice.call(arguments, 0).slice(0); | ||
var self = this; | ||
var args = Array.prototype.slice.call(arguments, 2); | ||
var callback = args.pop(); | ||
var currentContextIndex = 0; | ||
// If we don't have the mechanism fail | ||
if (this.authProviders[mechanism] == null && mechanism !== 'default') { | ||
return callback(new MongoError(f('auth provider %s does not exist', mechanism))); | ||
} | ||
// Are we already authenticating, throw | ||
if (this.authenticating) { | ||
return callback(new MongoError('authentication or logout allready in process')); | ||
} | ||
// Topology is not connected, save the call in the provided store to be | ||
// Executed at some point when the handler deems it's reconnected | ||
if (!this.isConnected() && self.s.disconnectHandler != null) { | ||
if (!self.s.replicaSetState.hasPrimary() && !self.s.options.secondaryOnlyConnectionAllowed) { | ||
return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback); | ||
} else if ( | ||
!self.s.replicaSetState.hasSecondary() && | ||
self.s.options.secondaryOnlyConnectionAllowed | ||
) { | ||
return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback); | ||
} | ||
} | ||
// Set to authenticating | ||
this.authenticating = true; | ||
// All errors | ||
var errors = []; | ||
// Get all the servers | ||
var servers = this.s.replicaSetState.allServers(); | ||
// No servers return | ||
if (servers.length === 0) { | ||
this.authenticating = false; | ||
callback(null, true); | ||
} | ||
// Authenticate | ||
function auth(server) { | ||
// Arguments without a callback | ||
var argsWithoutCallback = [mechanism, db].concat(args.slice(0)); | ||
// Create arguments | ||
var finalArguments = argsWithoutCallback.concat([ | ||
function(err) { | ||
count = count - 1; | ||
// Save all the errors | ||
if (err) errors.push({ name: server.name, err: err }); | ||
// We are done | ||
if (count === 0) { | ||
// Auth is done | ||
self.authenticating = false; | ||
// Return the auth error | ||
if (errors.length) { | ||
// Remove the entry from the stored authentication contexts | ||
self.s.authenticationContexts.splice(currentContextIndex, 0); | ||
// Return error | ||
return callback( | ||
new MongoError({ | ||
message: 'authentication fail', | ||
errors: errors | ||
}), | ||
false | ||
); | ||
} | ||
// Successfully authenticated session | ||
callback(null, self); | ||
} | ||
} | ||
]); | ||
if (!server.lastIsMaster().arbiterOnly) { | ||
// Execute the auth only against non arbiter servers | ||
server.auth.apply(server, finalArguments); | ||
} else { | ||
// If we are authenticating against an arbiter just ignore it | ||
finalArguments.pop()(null); | ||
} | ||
} | ||
// Get total count | ||
var count = servers.length; | ||
// Save current context index | ||
currentContextIndex = this.s.authenticationContexts.length; | ||
// Store the auth context and return the last index | ||
this.s.authenticationContexts.push([mechanism, db].concat(args.slice(0))); | ||
// Authenticate against all servers | ||
while (servers.length > 0) { | ||
auth(servers.shift()); | ||
} | ||
}; | ||
/** | ||
* Logout from a database | ||
* @method | ||
* @param {string} db The db we are logging out from | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
ReplSet.prototype.logout = function(dbName, callback) { | ||
var self = this; | ||
// Are we authenticating or logging out, throw | ||
if (this.authenticating) { | ||
throw new MongoError('authentication or logout allready in process'); | ||
} | ||
// Ensure no new members are processed while logging out | ||
this.authenticating = true; | ||
// Remove from all auth providers (avoid any reaplication of the auth details) | ||
var providers = Object.keys(this.authProviders); | ||
for (var i = 0; i < providers.length; i++) { | ||
this.authProviders[providers[i]].logout(dbName); | ||
} | ||
// Clear out any contexts associated with the db | ||
self.s.authenticationContexts = self.s.authenticationContexts.filter(function(context) { | ||
return context[1] !== dbName; | ||
}); | ||
// Now logout all the servers | ||
var servers = this.s.replicaSetState.allServers(); | ||
var count = servers.length; | ||
if (count === 0) return callback(); | ||
var errors = []; | ||
function logoutServer(_server, cb) { | ||
_server.logout(dbName, function(err) { | ||
if (err) errors.push({ name: _server.name, err: err }); | ||
cb(); | ||
}); | ||
} | ||
// Execute logout on all server instances | ||
for (i = 0; i < servers.length; i++) { | ||
logoutServer(servers[i], function() { | ||
count = count - 1; | ||
if (count === 0) { | ||
// Do not block new operations | ||
self.authenticating = false; | ||
// If we have one or more errors | ||
if (errors.length) | ||
return callback( | ||
new MongoError({ | ||
message: f('logout failed against db %s', dbName), | ||
errors: errors | ||
}), | ||
false | ||
); | ||
// No errors | ||
callback(); | ||
} | ||
}); | ||
} | ||
}; | ||
/** | ||
* Get a new cursor | ||
@@ -1580,0 +1343,0 @@ * @method |
@@ -11,7 +11,5 @@ 'use strict'; | ||
Pool = require('../connection/pool'), | ||
Query = require('../connection/commands').Query, | ||
MongoError = require('../error').MongoError, | ||
MongoNetworkError = require('../error').MongoNetworkError, | ||
TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support'), | ||
ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support'), | ||
wireProtocol = require('../wireprotocol'), | ||
BasicCursor = require('../cursor'), | ||
@@ -27,53 +25,2 @@ sdam = require('./shared'), | ||
function getSaslSupportedMechs(options) { | ||
if (!options) { | ||
return {}; | ||
} | ||
const authArray = options.auth || []; | ||
const authMechanism = authArray[0] || options.authMechanism; | ||
const authSource = authArray[1] || options.authSource || options.dbName || 'admin'; | ||
const user = authArray[2] || options.user; | ||
if (typeof authMechanism === 'string' && authMechanism.toUpperCase() !== 'DEFAULT') { | ||
return {}; | ||
} | ||
if (!user) { | ||
return {}; | ||
} | ||
return { saslSupportedMechs: `${authSource}.${user}` }; | ||
} | ||
function getDefaultAuthMechanism(ismaster) { | ||
if (ismaster) { | ||
// If ismaster contains saslSupportedMechs, use scram-sha-256 | ||
// if it is available, else scram-sha-1 | ||
if (Array.isArray(ismaster.saslSupportedMechs)) { | ||
return ismaster.saslSupportedMechs.indexOf('SCRAM-SHA-256') >= 0 | ||
? 'scram-sha-256' | ||
: 'scram-sha-1'; | ||
} | ||
// Fallback to legacy selection method. If wire version >= 3, use scram-sha-1 | ||
if (ismaster.maxWireVersion >= 3) { | ||
return 'scram-sha-1'; | ||
} | ||
} | ||
// Default for wireprotocol < 3 | ||
return 'mongocr'; | ||
} | ||
function extractIsMasterError(err, result) { | ||
if (err) { | ||
return err; | ||
} | ||
if (result && result.result && result.result.ok === 0) { | ||
return new MongoError(result.result); | ||
} | ||
} | ||
// Used for filtering out fields for loggin | ||
@@ -95,3 +42,2 @@ var debugFields = [ | ||
'socketTimeout', | ||
'singleBufferSerializtion', | ||
'ssl', | ||
@@ -231,5 +177,2 @@ 'ca', | ||
this.initialConnect = true; | ||
// Wire protocol handler, default to oldest known protocol handler | ||
// this gets changed when the first ismaster is called. | ||
this.wireProtocolHandler = new TwoSixWireProtocolSupport(); | ||
// Default type | ||
@@ -310,16 +253,2 @@ this._type = 'server'; | ||
function isSupportedServer(response) { | ||
return response && typeof response.maxWireVersion === 'number' && response.maxWireVersion >= 2; | ||
} | ||
function configureWireProtocolHandler(self, ismaster) { | ||
// 3.2 wire protocol handler | ||
if (ismaster.maxWireVersion >= 4) { | ||
return new ThreeTwoWireProtocolSupport(); | ||
} | ||
// default to 2.6 wire protocol handler | ||
return new TwoSixWireProtocolSupport(); | ||
} | ||
function disconnectHandler(self, type, ns, cmd, options, callback) { | ||
@@ -352,6 +281,2 @@ // Topology is not connected, save the call in the provided store to be | ||
// Perform ismaster call | ||
// Query options | ||
var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true }; | ||
// Create a query instance | ||
var query = new Query(self.s.bson, 'admin.$cmd', { ismaster: true }, queryOptions); | ||
// Get start time | ||
@@ -361,4 +286,5 @@ var start = new Date().getTime(); | ||
// Execute the ismaster query | ||
self.s.pool.write( | ||
query, | ||
self.command( | ||
'admin.$cmd', | ||
{ ismaster: true }, | ||
{ | ||
@@ -371,3 +297,3 @@ socketTimeout: | ||
}, | ||
function(err, result) { | ||
(err, result) => { | ||
// Set initial lastIsMasterMS | ||
@@ -388,3 +314,3 @@ self.lastIsMasterMS = new Date().getTime() - start; | ||
var eventHandler = function(self, event) { | ||
return function(err) { | ||
return function(err, conn) { | ||
// Log information of received information if in info mode | ||
@@ -400,122 +326,63 @@ if (self.s.logger.isInfo()) { | ||
if (event === 'connect') { | ||
// Issue an ismaster command at connect | ||
// Query options | ||
var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true }; | ||
// Create a query instance | ||
var compressors = | ||
self.s.compression && self.s.compression.compressors ? self.s.compression.compressors : []; | ||
var query = new Query( | ||
self.s.bson, | ||
'admin.$cmd', | ||
Object.assign( | ||
{ ismaster: true, client: self.clientInfo, compression: compressors }, | ||
getSaslSupportedMechs(self.s.options) | ||
), | ||
queryOptions | ||
); | ||
// Get start time | ||
var start = new Date().getTime(); | ||
// Execute the ismaster query | ||
self.s.pool.write( | ||
query, | ||
{ | ||
socketTimeout: self.s.options.connectionTimeout || 2000 | ||
}, | ||
function(err, result) { | ||
// Set initial lastIsMasterMS | ||
self.lastIsMasterMS = new Date().getTime() - start; | ||
self.initialConnect = false; | ||
self.ismaster = conn.ismaster; | ||
self.lastIsMasterMS = conn.lastIsMasterMS; | ||
if (conn.agreedCompressor) { | ||
self.s.pool.options.agreedCompressor = conn.agreedCompressor; | ||
} | ||
const serverError = extractIsMasterError(err, result); | ||
if (conn.zlibCompressionLevel) { | ||
self.s.pool.options.zlibCompressionLevel = conn.zlibCompressionLevel; | ||
} | ||
if (serverError) { | ||
self.destroy(); | ||
return self.emit('error', serverError); | ||
} | ||
if (conn.ismaster.$clusterTime) { | ||
const $clusterTime = conn.ismaster.$clusterTime; | ||
self.clusterTime = $clusterTime; | ||
} | ||
if (!isSupportedServer(result.result)) { | ||
self.destroy(); | ||
const latestSupportedVersion = '2.6'; | ||
const message = | ||
'Server at ' + | ||
self.s.options.host + | ||
':' + | ||
self.s.options.port + | ||
' reports wire version ' + | ||
(result.result.maxWireVersion || 0) + | ||
', but this version of Node.js Driver requires at least 2 (MongoDB' + | ||
latestSupportedVersion + | ||
').'; | ||
return self.emit('error', new MongoError(message), self); | ||
} | ||
// It's a proxy change the type so | ||
// the wireprotocol will send $readPreference | ||
if (self.ismaster.msg === 'isdbgrid') { | ||
self._type = 'mongos'; | ||
} | ||
// Determine whether the server is instructing us to use a compressor | ||
if (result.result && result.result.compression) { | ||
for (var i = 0; i < self.s.compression.compressors.length; i++) { | ||
if (result.result.compression.indexOf(self.s.compression.compressors[i]) > -1) { | ||
self.s.pool.options.agreedCompressor = self.s.compression.compressors[i]; | ||
break; | ||
} | ||
} | ||
// Have we defined self monitoring | ||
if (self.s.monitoring) { | ||
self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval); | ||
} | ||
if (self.s.compression.zlibCompressionLevel) { | ||
self.s.pool.options.zlibCompressionLevel = self.s.compression.zlibCompressionLevel; | ||
// Emit server description changed if something listening | ||
sdam.emitServerDescriptionChanged(self, { | ||
address: self.name, | ||
arbiters: [], | ||
hosts: [], | ||
passives: [], | ||
type: sdam.getTopologyType(self) | ||
}); | ||
if (!self.s.inTopology) { | ||
// Emit topology description changed if something listening | ||
sdam.emitTopologyDescriptionChanged(self, { | ||
topologyType: 'Single', | ||
servers: [ | ||
{ | ||
address: self.name, | ||
arbiters: [], | ||
hosts: [], | ||
passives: [], | ||
type: sdam.getTopologyType(self) | ||
} | ||
} | ||
] | ||
}); | ||
} | ||
// Ensure no error emitted after initial connect when reconnecting | ||
self.initialConnect = false; | ||
// Save the ismaster | ||
self.ismaster = result.result; | ||
// Log the ismaster if available | ||
if (self.s.logger.isInfo()) { | ||
self.s.logger.info( | ||
f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster)) | ||
); | ||
} | ||
// It's a proxy change the type so | ||
// the wireprotocol will send $readPreference | ||
if (self.ismaster.msg === 'isdbgrid') { | ||
self._type = 'mongos'; | ||
} | ||
// Add the correct wire protocol handler | ||
self.wireProtocolHandler = configureWireProtocolHandler(self, self.ismaster); | ||
// Have we defined self monitoring | ||
if (self.s.monitoring) { | ||
self.monitoringProcessId = setTimeout( | ||
monitoringProcess(self), | ||
self.s.monitoringInterval | ||
); | ||
} | ||
// Emit server description changed if something listening | ||
sdam.emitServerDescriptionChanged(self, { | ||
address: self.name, | ||
arbiters: [], | ||
hosts: [], | ||
passives: [], | ||
type: sdam.getTopologyType(self) | ||
}); | ||
if (!self.s.inTopology) { | ||
// Emit topology description changed if something listening | ||
sdam.emitTopologyDescriptionChanged(self, { | ||
topologyType: 'Single', | ||
servers: [ | ||
{ | ||
address: self.name, | ||
arbiters: [], | ||
hosts: [], | ||
passives: [], | ||
type: sdam.getTopologyType(self) | ||
} | ||
] | ||
}); | ||
} | ||
// Log the ismaster if available | ||
if (self.s.logger.isInfo()) { | ||
self.s.logger.info( | ||
f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster)) | ||
); | ||
} | ||
// Emit connect | ||
self.emit('connect', self); | ||
} | ||
); | ||
// Emit connect | ||
self.emit('connect', self); | ||
} else if ( | ||
@@ -602,4 +469,2 @@ event === 'error' || | ||
* Initiate server connect | ||
* @method | ||
* @param {array} [options.auth=null] Array of auth options to apply on connect | ||
*/ | ||
@@ -644,11 +509,16 @@ Server.prototype.connect = function(options) { | ||
// Connect with optional auth settings | ||
if (options.auth) { | ||
self.s.pool.connect.apply(self.s.pool, options.auth); | ||
} else { | ||
self.s.pool.connect(); | ||
} | ||
self.s.pool.connect(); | ||
}; | ||
/** | ||
* Authenticate the topology. | ||
* @method | ||
* @param {MongoCredentials} credentials The credentials for authentication we are using | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Server.prototype.auth = function(credentials, callback) { | ||
callback(null, null); | ||
}; | ||
/** | ||
* Get the server description | ||
@@ -770,3 +640,3 @@ * @method | ||
self.wireProtocolHandler.command(self, ns, cmd, options, callback); | ||
wireProtocol.command(self, ns, cmd, options, callback); | ||
}; | ||
@@ -802,3 +672,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.insert(self, ns, ops, options, callback); | ||
return wireProtocol.insert(self, ns, ops, options, callback); | ||
}; | ||
@@ -838,3 +708,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.update(self, ns, ops, options, callback); | ||
return wireProtocol.update(self, ns, ops, options, callback); | ||
}; | ||
@@ -874,3 +744,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.remove(self, ns, ops, options, callback); | ||
return wireProtocol.remove(self, ns, ops, options, callback); | ||
}; | ||
@@ -905,48 +775,2 @@ | ||
/** | ||
* Logout from a database | ||
* @method | ||
* @param {string} db The db we are logging out from | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Server.prototype.logout = function(dbName, callback) { | ||
this.s.pool.logout(dbName, callback); | ||
}; | ||
/** | ||
* Authenticate using a specified mechanism | ||
* @method | ||
* @param {string} mechanism The Auth mechanism we are invoking | ||
* @param {string} db The db we are invoking the mechanism against | ||
* @param {...object} param Parameters for the specific mechanism | ||
* @param {authResultCallback} callback A callback function | ||
*/ | ||
Server.prototype.auth = function(mechanism, db) { | ||
var self = this; | ||
if (mechanism === 'default') { | ||
mechanism = getDefaultAuthMechanism(self.ismaster); | ||
} | ||
// Slice all the arguments off | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
// Set the mechanism | ||
args[0] = mechanism; | ||
// Get the callback | ||
var callback = args[args.length - 1]; | ||
// If we are not connected or have a disconnectHandler specified | ||
if (disconnectHandler(self, 'auth', db, args, {}, callback)) { | ||
return; | ||
} | ||
// Do not authenticate if we are an arbiter | ||
if (this.lastIsMaster() && this.lastIsMaster().arbiterOnly) { | ||
return callback(null, true); | ||
} | ||
// Apply the arguments to the pool | ||
self.s.pool.auth.apply(self.s.pool, args); | ||
}; | ||
/** | ||
* Compare two server instances | ||
@@ -974,2 +798,6 @@ * @method | ||
* Selects a server | ||
* @method | ||
* @param {function} selector Unused | ||
* @param {ReadPreference} [options.readPreference] Unused | ||
* @param {ClientSession} [options.session] Unused | ||
* @return {Server} | ||
@@ -976,0 +804,0 @@ */ |
@@ -104,4 +104,16 @@ 'use strict'; | ||
if (options.readPreference) this.options.readPreference = options.readPreference; | ||
// TODO: This isn't technically necessary | ||
this._pinnedServer = undefined; | ||
this._recoveryToken = undefined; | ||
} | ||
get server() { | ||
return this._pinnedServer; | ||
} | ||
get recoveryToken() { | ||
return this._recoveryToken; | ||
} | ||
/** | ||
@@ -126,2 +138,5 @@ * @ignore | ||
this.state = nextState; | ||
if (this.state === TxnState.NO_TRANSACTION || this.state === TxnState.STARTING_TRANSACTION) { | ||
this.unpinServer(); | ||
} | ||
return; | ||
@@ -134,4 +149,14 @@ } | ||
} | ||
pinServer(server) { | ||
if (this.isActive) { | ||
this._pinnedServer = server; | ||
} | ||
} | ||
unpinServer() { | ||
this._pinnedServer = undefined; | ||
} | ||
} | ||
module.exports = { TxnState, Transaction }; |
@@ -78,2 +78,21 @@ 'use strict'; | ||
/** | ||
* A helper function for determining `maxWireVersion` between legacy and new topology | ||
* instances | ||
* | ||
* @private | ||
* @param {(Topology|Server)} topologyOrServer | ||
*/ | ||
function maxWireVersion(topologyOrServer) { | ||
if (topologyOrServer.ismaster) { | ||
return topologyOrServer.ismaster.maxWireVersion; | ||
} | ||
if (topologyOrServer.description) { | ||
return topologyOrServer.description.maxWireVersion; | ||
} | ||
return null; | ||
} | ||
/* | ||
@@ -88,5 +107,15 @@ * Checks that collation is supported by server. | ||
function collationNotSupported(server, cmd) { | ||
return cmd && cmd.collation && server.ismaster && server.ismaster.maxWireVersion < 5; | ||
return cmd && cmd.collation && maxWireVersion(server) < 5; | ||
} | ||
/** | ||
* Checks if a given value is a Promise | ||
* | ||
* @param {*} maybePromise | ||
* @return true if the provided value is a Promise | ||
*/ | ||
function isPromiseLike(maybePromise) { | ||
return maybePromise && typeof maybePromise.then === 'function'; | ||
} | ||
module.exports = { | ||
@@ -98,3 +127,5 @@ uuidV4, | ||
retrieveEJSON, | ||
retrieveKerberos | ||
retrieveKerberos, | ||
maxWireVersion, | ||
isPromiseLike | ||
}; |
'use strict'; | ||
var ReadPreference = require('../topologies/read_preference'), | ||
MongoError = require('../error').MongoError; | ||
const ReadPreference = require('../topologies/read_preference'); | ||
const MongoError = require('../error').MongoError; | ||
const ServerType = require('../sdam/server_description').ServerType; | ||
const TopologyDescription = require('../sdam/topology_description').TopologyDescription; | ||
var MESSAGE_HEADER_SIZE = 16; | ||
const MESSAGE_HEADER_SIZE = 16; | ||
const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID | ||
@@ -18,3 +21,4 @@ // OPCODE Numbers | ||
OP_KILL_CURSORS: 2007, | ||
OP_COMPRESSED: 2012 | ||
OP_COMPRESSED: 2012, | ||
OP_MSG: 2013 | ||
}; | ||
@@ -76,6 +80,15 @@ | ||
function isMongos(server) { | ||
if (server.type === 'mongos') return true; | ||
if (server.parent && server.parent.type === 'mongos') return true; | ||
// NOTE: handle unified topology | ||
function isSharded(topologyOrServer) { | ||
if (topologyOrServer.type === 'mongos') return true; | ||
if (topologyOrServer.description && topologyOrServer.description.type === ServerType.Mongos) { | ||
return true; | ||
} | ||
// NOTE: This is incredibly inefficient, and should be removed once command construction | ||
// happens based on `Server` not `Topology`. | ||
if (topologyOrServer.description && topologyOrServer.description instanceof TopologyDescription) { | ||
const servers = Array.from(topologyOrServer.description.servers.values()); | ||
return servers.some(server => server.type === ServerType.Mongos); | ||
} | ||
return false; | ||
@@ -97,8 +110,9 @@ } | ||
MESSAGE_HEADER_SIZE, | ||
COMPRESSION_DETAILS_SIZE, | ||
opcodes, | ||
parseHeader, | ||
applyCommonQueryOptions, | ||
isMongos, | ||
isSharded, | ||
databaseNamespace, | ||
collectionNamespace | ||
}; |
{ | ||
"name": "mongodb-core", | ||
"version": "3.1.11", | ||
"version": "3.2.0-beta1", | ||
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications", | ||
@@ -40,3 +40,3 @@ "main": "index.js", | ||
"mongodb-extjson": "^2.1.2", | ||
"mongodb-mock-server": "^1.0.0", | ||
"mongodb-mock-server": "^1.0.1", | ||
"mongodb-test-runner": "^1.1.18", | ||
@@ -43,0 +43,0 @@ "prettier": "~1.12.0", |
SPDX disjunction
LicenseSPDX disjunction for an artifact's license information
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
SPDX disjunction
LicenseSPDX disjunction for an artifact's license information
Found 1 instance in 1 package
50
556005
13664
1