Socket
Socket
Sign inDemoInstall

mongodb-core

Package Overview
Dependencies
8
Maintainers
3
Versions
177
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.1.11 to 3.2.0-beta1

lib/auth/auth_provider.js

2

index.js

@@ -36,5 +36,7 @@ 'use strict';

EJSON: EJSON,
Topology: require('./lib/sdam/topology'),
// Raw operations
Query: require('./lib/connection/commands').Query,
// Auth mechanisms
MongoCredentials: require('./lib/auth/mongo_credentials').MongoCredentials,
defaultAuthProviders: require('./lib/auth/defaultAuthProviders').defaultAuthProviders,

@@ -41,0 +43,0 @@ MongoCR: require('./lib/auth/mongocr'),

332

lib/auth/gssapi.js
'use strict';
const f = require('util').format;
const Query = require('../connection/commands').Query;
const MongoError = require('../error').MongoError;
const AuthProvider = require('./auth_provider').AuthProvider;
const retrieveKerberos = require('../utils').retrieveKerberos;
let kerberos;
var AuthSession = function(db, username, password, options) {
this.db = db;
this.username = username;
this.password = password;
this.options = options;
};
AuthSession.prototype.equal = function(session) {
return (
session.db === this.db &&
session.username === this.username &&
session.password === this.password
);
};
/**
* Creates a new GSSAPI authentication mechanism
* @class
* @return {GSSAPI} A cursor instance
* @extends AuthProvider
*/
var GSSAPI = function(bson) {
this.bson = bson;
this.authStore = [];
};
class GSSAPI extends AuthProvider {
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const source = credentials.source;
const username = credentials.username;
const password = credentials.password;
const mechanismProperties = credentials.mechanismProperties;
const gssapiServiceName =
mechanismProperties['gssapiservicename'] ||
mechanismProperties['gssapiServiceName'] ||
'mongodb';
/**
* Authenticate
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {string} db Name of the database
* @param {string} username Username
* @param {string} password Password
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
GSSAPI.prototype.auth = function(server, connections, db, username, password, options, callback) {
var self = this;
let kerberos;
try {
kerberos = retrieveKerberos();
} catch (e) {
return callback(e, null);
GSSAPIInitialize(
this,
kerberos.processes.MongoAuthProcess,
source,
username,
password,
source,
gssapiServiceName,
sendAuthCommand,
connection,
mechanismProperties,
callback
);
}
// TODO: remove this once we fix URI parsing
var gssapiServiceName = options['gssapiservicename'] || options['gssapiServiceName'] || 'mongodb';
// Total connections
var count = connections.length;
if (count === 0) return callback(null, null);
/**
* Authenticate
* @override
* @method
*/
auth(sendAuthCommand, connections, credentials, callback) {
if (kerberos == null) {
try {
kerberos = retrieveKerberos();
} catch (e) {
return callback(e, null);
}
}
// Valid connections
var numberOfValidConnections = 0;
var errorObject = null;
// For each connection we need to authenticate
while (connections.length > 0) {
// Execute MongoCR
var execute = function(connection) {
// Start Auth process for a connection
GSSAPIInitialize(
self,
kerberos.processes.MongoAuthProcess,
db,
username,
password,
db,
gssapiServiceName,
server,
connection,
options,
function(err, r) {
// Adjust count
count = count - 1;
// If we have an error
if (err) {
errorObject = err;
} else if (r.result['$err']) {
errorObject = r.result;
} else if (r.result['errmsg']) {
errorObject = r.result;
} else {
numberOfValidConnections = numberOfValidConnections + 1;
}
// We have authenticated all connections
if (count === 0 && numberOfValidConnections > 0) {
// Store the auth details
addAuthSession(self.authStore, new AuthSession(db, username, password, options));
// Return correct authentication
callback(null, true);
} else if (count === 0) {
if (errorObject == null)
errorObject = new MongoError(f('failed to authenticate using mongocr'));
callback(errorObject, false);
}
}
);
};
var _execute = function(_connection) {
process.nextTick(function() {
execute(_connection);
});
};
_execute(connections.shift());
super.auth(sendAuthCommand, connections, credentials, callback);
}
};
}

@@ -129,3 +70,3 @@ //

gssapiServiceName,
server,
sendAuthCommand,
connection,

@@ -160,3 +101,3 @@ options,

authdb,
server,
sendAuthCommand,
connection,

@@ -179,3 +120,3 @@ callback

authdb,
server,
sendAuthCommand,
connection,

@@ -193,32 +134,24 @@ callback

// Write the commmand on the connection
server(
connection,
new Query(self.bson, '$external.$cmd', command, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
sendAuthCommand(connection, '$external.$cmd', command, (err, doc) => {
if (err) return callback(err, false);
// Execute mongodb transition
mongo_auth_process.transition(doc.payload, function(err, payload) {
if (err) return callback(err, false);
var doc = r.result;
// Execute mongodb transition
mongo_auth_process.transition(r.result.payload, function(err, payload) {
if (err) return callback(err, false);
// MongoDB API Second Step
MongoDBGSSAPISecondStep(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
server,
connection,
callback
);
});
}
);
// MongoDB API Second Step
MongoDBGSSAPISecondStep(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
);
});
});
};

@@ -237,3 +170,3 @@

authdb,
server,
sendAuthCommand,
connection,

@@ -251,32 +184,24 @@ callback

// Write the commmand on the connection
server(
connection,
new Query(self.bson, '$external.$cmd', command, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
sendAuthCommand(connection, '$external.$cmd', command, (err, doc) => {
if (err) return callback(err, false);
// Call next transition for kerberos
mongo_auth_process.transition(doc.payload, function(err, payload) {
if (err) return callback(err, false);
var doc = r.result;
// Call next transition for kerberos
mongo_auth_process.transition(doc.payload, function(err, payload) {
if (err) return callback(err, false);
// Call the last and third step
MongoDBGSSAPIThirdStep(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
server,
connection,
callback
);
});
}
);
// Call the last and third step
MongoDBGSSAPIThirdStep(
self,
mongo_auth_process,
payload,
doc,
db,
username,
password,
authdb,
sendAuthCommand,
connection,
callback
);
});
});
};

@@ -293,3 +218,3 @@

authdb,
server,
sendAuthCommand,
connection,

@@ -306,41 +231,8 @@ callback

// Execute the command
server(
connection,
new Query(self.bson, '$external.$cmd', command, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
if (err) return callback(err, false);
mongo_auth_process.transition(null, function(err) {
if (err) return callback(err, null);
callback(null, r);
});
}
);
};
// Add to store only if it does not exist
var addAuthSession = function(authStore, session) {
var found = false;
for (var i = 0; i < authStore.length; i++) {
if (authStore[i].equal(session)) {
found = true;
break;
}
}
if (!found) authStore.push(session);
};
/**
* Remove authStore credentials
* @method
* @param {string} db Name of database we are removing authStore details about
* @return {object}
*/
GSSAPI.prototype.logout = function(dbName) {
this.authStore = this.authStore.filter(function(x) {
return x.db !== dbName;
sendAuthCommand(connection, '$external.$cmd', command, (err, r) => {
if (err) return callback(err, false);
mongo_auth_process.transition(null, function(err) {
if (err) return callback(err, null);
callback(null, r);
});
});

@@ -350,34 +242,2 @@ };

/**
* Re authenticate pool
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
GSSAPI.prototype.reauthenticate = function(server, connections, callback) {
var authStore = this.authStore.slice(0);
var count = authStore.length;
if (count === 0) return callback(null, null);
// Iterate over all the auth details stored
for (var i = 0; i < authStore.length; i++) {
this.auth(
server,
connections,
authStore[i].db,
authStore[i].username,
authStore[i].password,
authStore[i].options,
function(err) {
count = count - 1;
// Done re-authenticating
if (count === 0) {
callback(err, null);
}
}
);
}
};
/**
* This is a result from a authentication strategy

@@ -384,0 +244,0 @@ *

'use strict';
var f = require('util').format,
crypto = require('crypto'),
Query = require('../connection/commands').Query,
MongoError = require('../error').MongoError;
const crypto = require('crypto');
const AuthProvider = require('./auth_provider').AuthProvider;
var AuthSession = function(db, username, password) {
this.db = db;
this.username = username;
this.password = password;
};
AuthSession.prototype.equal = function(session) {
return (
session.db === this.db &&
session.username === this.username &&
session.password === this.password
);
};
/**
* Creates a new MongoCR authentication mechanism
* @class
* @return {MongoCR} A cursor instance
*
* @extends AuthProvider
*/
var MongoCR = function(bson) {
this.bson = bson;
this.authStore = [];
};
class MongoCR extends AuthProvider {
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const username = credentials.username;
const password = credentials.password;
const source = credentials.source;
// Add to store only if it does not exist
var addAuthSession = function(authStore, session) {
var found = false;
sendAuthCommand(connection, `${source}.$cmd`, { getnonce: 1 }, (err, r) => {
let nonce = null;
let key = null;
for (var i = 0; i < authStore.length; i++) {
if (authStore[i].equal(session)) {
found = true;
break;
}
}
// Get nonce
if (err == null) {
nonce = r.nonce;
// Use node md5 generator
let md5 = crypto.createHash('md5');
// Generate keys used for authentication
md5.update(username + ':mongo:' + password, 'utf8');
const hash_password = md5.digest('hex');
// Final key
md5 = crypto.createHash('md5');
md5.update(nonce + username + hash_password, 'utf8');
key = md5.digest('hex');
}
if (!found) authStore.push(session);
};
const authenticateCommand = {
authenticate: 1,
user: username,
nonce,
key
};
/**
* Authenticate
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {string} db Name of the database
* @param {string} username Username
* @param {string} password Password
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
MongoCR.prototype.auth = function(server, connections, db, username, password, callback) {
var self = this;
// Total connections
var count = connections.length;
if (count === 0) return callback(null, null);
// Valid connections
var numberOfValidConnections = 0;
var errorObject = null;
// For each connection we need to authenticate
while (connections.length > 0) {
// Execute MongoCR
var executeMongoCR = function(connection) {
// Write the commmand on the connection
server(
connection,
new Query(
self.bson,
f('%s.$cmd', db),
{
getnonce: 1
},
{
numberToSkip: 0,
numberToReturn: 1
}
),
function(err, r) {
var nonce = null;
var key = null;
// Adjust the number of connections left
// Get nonce
if (err == null) {
nonce = r.result.nonce;
// Use node md5 generator
var md5 = crypto.createHash('md5');
// Generate keys used for authentication
md5.update(username + ':mongo:' + password, 'utf8');
var hash_password = md5.digest('hex');
// Final key
md5 = crypto.createHash('md5');
md5.update(nonce + username + hash_password, 'utf8');
key = md5.digest('hex');
}
// Execute command
// Write the commmand on the connection
server(
connection,
new Query(
self.bson,
f('%s.$cmd', db),
{
authenticate: 1,
user: username,
nonce: nonce,
key: key
},
{
numberToSkip: 0,
numberToReturn: 1
}
),
function(err, r) {
count = count - 1;
// If we have an error
if (err) {
errorObject = err;
} else if (r.result['$err']) {
errorObject = r.result;
} else if (r.result['errmsg']) {
errorObject = r.result;
} else {
numberOfValidConnections = numberOfValidConnections + 1;
}
// We have authenticated all connections
if (count === 0 && numberOfValidConnections > 0) {
// Store the auth details
addAuthSession(self.authStore, new AuthSession(db, username, password));
// Return correct authentication
callback(null, true);
} else if (count === 0) {
if (errorObject == null)
errorObject = new MongoError(f('failed to authenticate using mongocr'));
callback(errorObject, false);
}
}
);
}
);
};
var _execute = function(_connection) {
process.nextTick(function() {
executeMongoCR(_connection);
});
};
_execute(connections.shift());
sendAuthCommand(connection, `${source}.$cmd`, authenticateCommand, callback);
});
}
};
}
/**
* Remove authStore credentials
* @method
* @param {string} db Name of database we are removing authStore details about
* @return {object}
*/
MongoCR.prototype.logout = function(dbName) {
this.authStore = this.authStore.filter(function(x) {
return x.db !== dbName;
});
};
/**
* Re authenticate pool
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
MongoCR.prototype.reauthenticate = function(server, connections, callback) {
var authStore = this.authStore.slice(0);
var count = authStore.length;
if (count === 0) return callback(null, null);
// Iterate over all the auth details stored
for (var i = 0; i < authStore.length; i++) {
this.auth(
server,
connections,
authStore[i].db,
authStore[i].username,
authStore[i].password,
function(err) {
count = count - 1;
// Done re-authenticating
if (count === 0) {
callback(err, null);
}
}
);
}
};
/**
* This is a result from a authentication strategy
*
* @callback authResultCallback
* @param {error} error An error object. Set to null if no error present
* @param {boolean} result The result of the authentication process
*/
module.exports = MongoCR;
'use strict';
var f = require('util').format,
retrieveBSON = require('../connection/utils').retrieveBSON,
Query = require('../connection/commands').Query,
MongoError = require('../error').MongoError;
const retrieveBSON = require('../connection/utils').retrieveBSON;
const AuthProvider = require('./auth_provider').AuthProvider;
var BSON = retrieveBSON(),
Binary = BSON.Binary;
// TODO: can we get the Binary type from this.bson instead?
const BSON = retrieveBSON();
const Binary = BSON.Binary;
var AuthSession = function(db, username, password) {
this.db = db;
this.username = username;
this.password = password;
};
AuthSession.prototype.equal = function(session) {
return (
session.db === this.db &&
session.username === this.username &&
session.password === this.password
);
};
/**
* Creates a new Plain authentication mechanism
* @class
* @return {Plain} A cursor instance
*
* @extends AuthProvider
*/
var Plain = function(bson) {
this.bson = bson;
this.authStore = [];
};
/**
* Authenticate
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {string} db Name of the database
* @param {string} username Username
* @param {string} password Password
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
Plain.prototype.auth = function(server, connections, db, username, password, callback) {
var self = this;
// Total connections
var count = connections.length;
if (count === 0) return callback(null, null);
// Valid connections
var numberOfValidConnections = 0;
var errorObject = null;
// For each connection we need to authenticate
while (connections.length > 0) {
// Execute MongoCR
var execute = function(connection) {
// Create payload
var payload = new Binary(f('\x00%s\x00%s', username, password));
// Let's start the sasl process
var command = {
saslStart: 1,
mechanism: 'PLAIN',
payload: payload,
autoAuthorize: 1
};
// Let's start the process
server(
connection,
new Query(self.bson, '$external.$cmd', command, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
// Adjust count
count = count - 1;
// If we have an error
if (err) {
errorObject = err;
} else if (r.result['$err']) {
errorObject = r.result;
} else if (r.result['errmsg']) {
errorObject = r.result;
} else {
numberOfValidConnections = numberOfValidConnections + 1;
}
// We have authenticated all connections
if (count === 0 && numberOfValidConnections > 0) {
// Store the auth details
addAuthSession(self.authStore, new AuthSession(db, username, password));
// Return correct authentication
callback(null, true);
} else if (count === 0) {
if (errorObject == null)
errorObject = new MongoError(f('failed to authenticate using mongocr'));
callback(errorObject, false);
}
}
);
class Plain extends AuthProvider {
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const username = credentials.username;
const password = credentials.password;
const payload = new Binary(`\x00${username}\x00${password}`);
const command = {
saslStart: 1,
mechanism: 'PLAIN',
payload: payload,
autoAuthorize: 1
};
var _execute = function(_connection) {
process.nextTick(function() {
execute(_connection);
});
};
_execute(connections.shift());
sendAuthCommand(connection, '$external.$cmd', command, callback);
}
};
}
// Add to store only if it does not exist
var addAuthSession = function(authStore, session) {
var found = false;
for (var i = 0; i < authStore.length; i++) {
if (authStore[i].equal(session)) {
found = true;
break;
}
}
if (!found) authStore.push(session);
};
/**
* Remove authStore credentials
* @method
* @param {string} db Name of database we are removing authStore details about
* @return {object}
*/
Plain.prototype.logout = function(dbName) {
this.authStore = this.authStore.filter(function(x) {
return x.db !== dbName;
});
};
/**
* Re authenticate pool
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
Plain.prototype.reauthenticate = function(server, connections, callback) {
var authStore = this.authStore.slice(0);
var count = authStore.length;
if (count === 0) return callback(null, null);
// Iterate over all the auth details stored
for (var i = 0; i < authStore.length; i++) {
this.auth(
server,
connections,
authStore[i].db,
authStore[i].username,
authStore[i].password,
function(err) {
count = count - 1;
// Done re-authenticating
if (count === 0) {
callback(err, null);
}
}
);
}
};
/**
* This is a result from a authentication strategy
*
* @callback authResultCallback
* @param {error} error An error object. Set to null if no error present
* @param {boolean} result The result of the authentication process
*/
module.exports = Plain;
'use strict';
var f = require('util').format,
crypto = require('crypto'),
retrieveBSON = require('../connection/utils').retrieveBSON,
Query = require('../connection/commands').Query,
MongoError = require('../error').MongoError,
Buffer = require('safe-buffer').Buffer;
const crypto = require('crypto');
const Buffer = require('safe-buffer').Buffer;
const retrieveBSON = require('../connection/utils').retrieveBSON;
const MongoError = require('../error').MongoError;
const AuthProvider = require('./auth_provider').AuthProvider;
const BSON = retrieveBSON();
const Binary = BSON.Binary;
let saslprep;
try {

@@ -18,33 +19,2 @@ saslprep = require('saslprep');

var BSON = retrieveBSON(),
Binary = BSON.Binary;
var AuthSession = function(db, username, password) {
this.db = db;
this.username = username;
this.password = password;
};
AuthSession.prototype.equal = function(session) {
return (
session.db === this.db &&
session.username === this.username &&
session.password === this.password
);
};
var id = 0;
/**
* Creates a new ScramSHA authentication mechanism
* @class
* @return {ScramSHA} A cursor instance
*/
var ScramSHA = function(bson, cryptoMethod) {
this.bson = bson;
this.authStore = [];
this.id = id++;
this.cryptoMethod = cryptoMethod || 'sha1';
};
var parsePayload = function(payload) {

@@ -140,50 +110,49 @@ var dict = {};

/**
* Authenticate
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {string} db Name of the database
* @param {string} username Username
* @param {string} password Password
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
* Creates a new ScramSHA authentication mechanism
* @class
* @extends AuthProvider
*/
ScramSHA.prototype.auth = function(server, connections, db, username, password, callback) {
var self = this;
// Total connections
var count = connections.length;
if (count === 0) return callback(null, null);
class ScramSHA extends AuthProvider {
constructor(bson, cryptoMethod) {
super(bson);
this.cryptoMethod = cryptoMethod || 'sha1';
}
// Valid connections
var numberOfValidConnections = 0;
var errorObject = null;
static _getError(err, r) {
if (err) {
return err;
}
const cryptoMethod = this.cryptoMethod;
let mechanism = 'SCRAM-SHA-1';
let processedPassword;
if (r.$err || r.errmsg) {
return new MongoError(r);
}
}
if (cryptoMethod === 'sha256') {
mechanism = 'SCRAM-SHA-256';
/**
* @ignore
*/
_executeScram(sendAuthCommand, connection, credentials, nonce, callback) {
let username = credentials.username;
const password = credentials.password;
const db = credentials.source;
let saslprepFn = (server.s && server.s.saslprep) || saslprep;
const cryptoMethod = this.cryptoMethod;
let mechanism = 'SCRAM-SHA-1';
let processedPassword;
if (saslprepFn) {
processedPassword = saslprepFn(password);
if (cryptoMethod === 'sha256') {
mechanism = 'SCRAM-SHA-256';
processedPassword = saslprep ? saslprep(password) : password;
} else {
console.warn('Warning: no saslprep library specified. Passwords will not be sanitized');
processedPassword = password;
try {
processedPassword = passwordDigest(username, password);
} catch (e) {
return callback(e);
}
}
} else {
processedPassword = passwordDigest(username, password);
}
// Execute MongoCR
var executeScram = function(connection) {
// Clean up the user
username = username.replace('=', '=3D').replace(',', '=2C');
// Create a random nonce
var nonce = crypto.randomBytes(24).toString('base64');
// var nonce = 'MsQUY9iw0T9fx2MUEz6LZPwGuhVvWAhc'
// NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8.

@@ -199,5 +168,5 @@ // Since the username is not sasl-prep-d, we need to do this here.

// Build command structure
var cmd = {
const saslStartCmd = {
saslStart: 1,
mechanism: mechanism,
mechanism,
payload: new Binary(Buffer.concat([Buffer.from('n,,', 'utf8'), firstBare])),

@@ -207,228 +176,104 @@ autoAuthorize: 1

// Handle the error
var handleError = function(err, r) {
if (err) {
numberOfValidConnections = numberOfValidConnections - 1;
errorObject = err;
return false;
} else if (r.result['$err']) {
errorObject = r.result;
return false;
} else if (r.result['errmsg']) {
errorObject = r.result;
return false;
} else {
numberOfValidConnections = numberOfValidConnections + 1;
// Write the commmand on the connection
sendAuthCommand(connection, `${db}.$cmd`, saslStartCmd, (err, r) => {
let tmpError = ScramSHA._getError(err, r);
if (tmpError) {
return callback(tmpError, null);
}
return true;
};
const dict = parsePayload(r.payload.value());
const iterations = parseInt(dict.i, 10);
const salt = dict.s;
const rnonce = dict.r;
// Finish up
var finish = function(_count, _numberOfValidConnections) {
if (_count === 0 && _numberOfValidConnections > 0) {
// Store the auth details
addAuthSession(self.authStore, new AuthSession(db, username, password));
// Return correct authentication
return callback(null, true);
} else if (_count === 0) {
if (errorObject == null)
errorObject = new MongoError(f('failed to authenticate using scram'));
return callback(errorObject, false);
// Set up start of proof
const withoutProof = `c=biws,r=${rnonce}`;
const saltedPassword = HI(
processedPassword,
Buffer.from(salt, 'base64'),
iterations,
cryptoMethod
);
if (iterations && iterations < 4096) {
const error = new MongoError(`Server returned an invalid iteration count ${iterations}`);
return callback(error, false);
}
};
var handleEnd = function(_err, _r) {
// Handle any error
handleError(_err, _r);
// Adjust the number of connections
count = count - 1;
// Execute the finish
finish(count, numberOfValidConnections);
};
const clientKey = HMAC(cryptoMethod, saltedPassword, 'Client Key');
const storedKey = H(cryptoMethod, clientKey);
const authMessage = [firstBare, r.payload.value().toString('base64'), withoutProof].join(',');
// Write the commmand on the connection
server(
connection,
new Query(self.bson, f('%s.$cmd', db), cmd, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
// Do we have an error, handle it
if (handleError(err, r) === false) {
count = count - 1;
const clientSignature = HMAC(cryptoMethod, storedKey, authMessage);
const clientProof = `p=${xor(clientKey, clientSignature)}`;
const clientFinal = [withoutProof, clientProof].join(',');
const saslContinueCmd = {
saslContinue: 1,
conversationId: r.conversationId,
payload: new Binary(Buffer.from(clientFinal))
};
if (count === 0 && numberOfValidConnections > 0) {
// Store the auth details
addAuthSession(self.authStore, new AuthSession(db, username, password));
// Return correct authentication
return callback(null, true);
} else if (count === 0) {
if (errorObject == null)
errorObject = new MongoError(f('failed to authenticate using scram'));
return callback(errorObject, false);
}
return;
sendAuthCommand(connection, `${db}.$cmd`, saslContinueCmd, (err, r) => {
if (!r || r.done !== false) {
return callback(err, r);
}
// Get the dictionary
var dict = parsePayload(r.result.payload.value());
// Unpack dictionary
var iterations = parseInt(dict.i, 10);
var salt = dict.s;
var rnonce = dict.r;
// Set up start of proof
var withoutProof = f('c=biws,r=%s', rnonce);
var saltedPassword = HI(
processedPassword,
Buffer.from(salt, 'base64'),
iterations,
cryptoMethod
);
if (iterations && iterations < 4096) {
const error = new MongoError(`Server returned an invalid iteration count ${iterations}`);
return callback(error, false);
}
// Create the client key
const clientKey = HMAC(cryptoMethod, saltedPassword, 'Client Key');
// Create the stored key
const storedKey = H(cryptoMethod, clientKey);
// Create the authentication message
const authMessage = [
firstBare,
r.result.payload.value().toString('base64'),
withoutProof
].join(',');
// Create client signature
const clientSignature = HMAC(cryptoMethod, storedKey, authMessage);
// Create client proof
const clientProof = f('p=%s', xor(clientKey, clientSignature));
// Create client final
const clientFinal = [withoutProof, clientProof].join(',');
// Create continue message
const cmd = {
const retrySaslContinueCmd = {
saslContinue: 1,
conversationId: r.result.conversationId,
payload: new Binary(Buffer.from(clientFinal))
conversationId: r.conversationId,
payload: Buffer.alloc(0)
};
//
// Execute sasl continue
// Write the commmand on the connection
server(
connection,
new Query(self.bson, f('%s.$cmd', db), cmd, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
if (r && r.result.done === false) {
var cmd = {
saslContinue: 1,
conversationId: r.result.conversationId,
payload: Buffer.alloc(0)
};
sendAuthCommand(connection, `${db}.$cmd`, retrySaslContinueCmd, callback);
});
});
}
// Write the commmand on the connection
server(
connection,
new Query(self.bson, f('%s.$cmd', db), cmd, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
handleEnd(err, r);
}
);
} else {
handleEnd(err, r);
}
}
);
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
// Create a random nonce
crypto.randomBytes(24, (err, buff) => {
if (err) {
return callback(err, null);
}
);
};
var _execute = function(_connection) {
process.nextTick(function() {
executeScram(_connection);
return this._executeScram(
sendAuthCommand,
connection,
credentials,
buff.toString('base64'),
callback
);
});
};
}
// For each connection we need to authenticate
while (connections.length > 0) {
_execute(connections.shift());
/**
* Authenticate
* @override
* @method
*/
auth(sendAuthCommand, connections, credentials, callback) {
this._checkSaslprep();
super.auth(sendAuthCommand, connections, credentials, callback);
}
};
// Add to store only if it does not exist
var addAuthSession = function(authStore, session) {
var found = false;
_checkSaslprep() {
const cryptoMethod = this.cryptoMethod;
for (var i = 0; i < authStore.length; i++) {
if (authStore[i].equal(session)) {
found = true;
break;
if (cryptoMethod === 'sha256') {
if (!saslprep) {
console.warn('Warning: no saslprep library specified. Passwords will not be sanitized');
}
}
}
}
if (!found) authStore.push(session);
};
/**
* Remove authStore credentials
* @method
* @param {string} db Name of database we are removing authStore details about
* @return {object}
* Creates a new ScramSHA1 authentication mechanism
* @class
* @extends ScramSHA
*/
ScramSHA.prototype.logout = function(dbName) {
this.authStore = this.authStore.filter(function(x) {
return x.db !== dbName;
});
};
/**
* Re authenticate pool
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
ScramSHA.prototype.reauthenticate = function(server, connections, callback) {
var authStore = this.authStore.slice(0);
var count = authStore.length;
// No connections
if (count === 0) return callback(null, null);
// Iterate over all the auth details stored
for (var i = 0; i < authStore.length; i++) {
this.auth(
server,
connections,
authStore[i].db,
authStore[i].username,
authStore[i].password,
function(err) {
count = count - 1;
// Done re-authenticating
if (count === 0) {
callback(err, null);
}
}
);
}
};
class ScramSHA1 extends ScramSHA {

@@ -440,2 +285,7 @@ constructor(bson) {

/**
* Creates a new ScramSHA256 authentication mechanism
* @class
* @extends ScramSHA
*/
class ScramSHA256 extends ScramSHA {

@@ -442,0 +292,0 @@ constructor(bson) {

'use strict';
const f = require('util').format;
const Query = require('../connection/commands').Query;
const MongoError = require('../error').MongoError;
const AuthProvider = require('./auth_provider').AuthProvider;
const retrieveKerberos = require('../utils').retrieveKerberos;
let kerberos;
var AuthSession = function(db, username, password, options) {
this.db = db;
this.username = username;
this.password = password;
this.options = options;
};
AuthSession.prototype.equal = function(session) {
return (
session.db === this.db &&
session.username === this.username &&
session.password === this.password
);
};
/**
* Creates a new SSPI authentication mechanism
* @class
* @return {SSPI} A cursor instance
* @extends AuthProvider
*/
var SSPI = function(bson) {
this.bson = bson;
this.authStore = [];
};
class SSPI extends AuthProvider {
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
// TODO: Destructure this
const username = credentials.username;
const password = credentials.password;
const mechanismProperties = credentials.mechanismProperties;
const gssapiServiceName =
mechanismProperties['gssapiservicename'] ||
mechanismProperties['gssapiServiceName'] ||
'mongodb';
/**
* Authenticate
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {string} db Name of the database
* @param {string} username Username
* @param {string} password Password
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
SSPI.prototype.auth = function(server, connections, db, username, password, options, callback) {
var self = this;
let kerberos;
try {
kerberos = retrieveKerberos();
} catch (e) {
return callback(e, null);
SSIPAuthenticate(
this,
kerberos.processes.MongoAuthProcess,
username,
password,
gssapiServiceName,
sendAuthCommand,
connection,
mechanismProperties,
callback
);
}
var gssapiServiceName = options['gssapiServiceName'] || 'mongodb';
// Total connections
var count = connections.length;
if (count === 0) return callback(null, null);
/**
* Authenticate
* @override
* @method
*/
auth(sendAuthCommand, connections, credentials, callback) {
if (kerberos == null) {
try {
kerberos = retrieveKerberos();
} catch (e) {
return callback(e, null);
}
}
// Valid connections
var numberOfValidConnections = 0;
var errorObject = null;
// For each connection we need to authenticate
while (connections.length > 0) {
// Execute MongoCR
var execute = function(connection) {
// Start Auth process for a connection
SSIPAuthenticate(
self,
kerberos.processes.MongoAuthProcess,
username,
password,
gssapiServiceName,
server,
connection,
options,
function(err, r) {
// Adjust count
count = count - 1;
// If we have an error
if (err) {
errorObject = err;
} else if (r && typeof r === 'object' && r.result['$err']) {
errorObject = r.result;
} else if (r && typeof r === 'object' && r.result['errmsg']) {
errorObject = r.result;
} else {
numberOfValidConnections = numberOfValidConnections + 1;
}
// We have authenticated all connections
if (count === 0 && numberOfValidConnections > 0) {
// Store the auth details
addAuthSession(self.authStore, new AuthSession(db, username, password, options));
// Return correct authentication
callback(null, true);
} else if (count === 0) {
if (errorObject == null)
errorObject = new MongoError(f('failed to authenticate using mongocr'));
callback(errorObject, false);
}
}
);
};
var _execute = function(_connection) {
process.nextTick(function() {
execute(_connection);
});
};
_execute(connections.shift());
super.auth(sendAuthCommand, connections, credentials, callback);
}
};
}

@@ -122,3 +64,3 @@ function SSIPAuthenticate(

gssapiServiceName,
server,
sendAuthCommand,
connection,

@@ -136,8 +78,3 @@ options,

function authCommand(command, authCb) {
const query = new Query(self.bson, '$external.$cmd', command, {
numberToSkip: 0,
numberToReturn: 1
});
server(connection, query, authCb);
sendAuthCommand(connection, '$external.$cmd', command, authCb);
}

@@ -158,5 +95,4 @@

authCommand(command, (err, result) => {
authCommand(command, (err, doc) => {
if (err) return callback(err, false);
const doc = result.result;

@@ -171,5 +107,4 @@ authProcess.transition(doc.payload, (err, payload) => {

authCommand(command, (err, result) => {
authCommand(command, (err, doc) => {
if (err) return callback(err, false);
const doc = result.result;

@@ -200,68 +135,2 @@ authProcess.transition(doc.payload, (err, payload) => {

// Add to store only if it does not exist
var addAuthSession = function(authStore, session) {
var found = false;
for (var i = 0; i < authStore.length; i++) {
if (authStore[i].equal(session)) {
found = true;
break;
}
}
if (!found) authStore.push(session);
};
/**
* Remove authStore credentials
* @method
* @param {string} db Name of database we are removing authStore details about
* @return {object}
*/
SSPI.prototype.logout = function(dbName) {
this.authStore = this.authStore.filter(function(x) {
return x.db !== dbName;
});
};
/**
* Re authenticate pool
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
SSPI.prototype.reauthenticate = function(server, connections, callback) {
var authStore = this.authStore.slice(0);
var count = authStore.length;
if (count === 0) return callback(null, null);
// Iterate over all the auth details stored
for (var i = 0; i < authStore.length; i++) {
this.auth(
server,
connections,
authStore[i].db,
authStore[i].username,
authStore[i].password,
authStore[i].options,
function(err) {
count = count - 1;
// Done re-authenticating
if (count === 0) {
callback(err, null);
}
}
);
}
};
/**
* This is a result from a authentication strategy
*
* @callback authResultCallback
* @param {error} error An error object. Set to null if no error present
* @param {boolean} result The result of the authentication process
*/
module.exports = SSPI;
'use strict';
var f = require('util').format,
Query = require('../connection/commands').Query,
MongoError = require('../error').MongoError;
const AuthProvider = require('./auth_provider').AuthProvider;
var AuthSession = function(db, username, password) {
this.db = db;
this.username = username;
this.password = password;
};
AuthSession.prototype.equal = function(session) {
return (
session.db === this.db &&
session.username === this.username &&
session.password === this.password
);
};
/**
* Creates a new X509 authentication mechanism
* @class
* @return {X509} A cursor instance
* @extends AuthProvider
*/
var X509 = function(bson) {
this.bson = bson;
this.authStore = [];
};
/**
* Authenticate
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {string} db Name of the database
* @param {string} username Username
* @param {string} password Password
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
X509.prototype.auth = function(server, connections, db, username, password, callback) {
var self = this;
// Total connections
var count = connections.length;
if (count === 0) return callback(null, null);
// Valid connections
var numberOfValidConnections = 0;
var errorObject = null;
// For each connection we need to authenticate
while (connections.length > 0) {
// Execute MongoCR
var execute = function(connection) {
// Let's start the sasl process
var command = {
authenticate: 1,
mechanism: 'MONGODB-X509'
};
// Add username if specified
if (username) {
command.user = username;
}
// Let's start the process
server(
connection,
new Query(self.bson, '$external.$cmd', command, {
numberToSkip: 0,
numberToReturn: 1
}),
function(err, r) {
// Adjust count
count = count - 1;
// If we have an error
if (err) {
errorObject = err;
} else if (r.result['$err']) {
errorObject = r.result;
} else if (r.result['errmsg']) {
errorObject = r.result;
} else {
numberOfValidConnections = numberOfValidConnections + 1;
}
// We have authenticated all connections
if (count === 0 && numberOfValidConnections > 0) {
// Store the auth details
addAuthSession(self.authStore, new AuthSession(db, username, password));
// Return correct authentication
callback(null, true);
} else if (count === 0) {
if (errorObject == null)
errorObject = new MongoError(f('failed to authenticate using mongocr'));
callback(errorObject, false);
}
}
);
};
var _execute = function(_connection) {
process.nextTick(function() {
execute(_connection);
});
};
_execute(connections.shift());
}
};
// Add to store only if it does not exist
var addAuthSession = function(authStore, session) {
var found = false;
for (var i = 0; i < authStore.length; i++) {
if (authStore[i].equal(session)) {
found = true;
break;
class X509 extends AuthProvider {
/**
* Implementation of authentication for a single connection
* @override
*/
_authenticateSingleConnection(sendAuthCommand, connection, credentials, callback) {
const username = credentials.username;
const command = { authenticate: 1, mechanism: 'MONGODB-X509' };
if (username) {
command.user = username;
}
}
if (!found) authStore.push(session);
};
/**
* Remove authStore credentials
* @method
* @param {string} db Name of database we are removing authStore details about
* @return {object}
*/
X509.prototype.logout = function(dbName) {
this.authStore = this.authStore.filter(function(x) {
return x.db !== dbName;
});
};
/**
* Re authenticate pool
* @method
* @param {{Server}|{ReplSet}|{Mongos}} server Topology the authentication method is being called on
* @param {[]Connections} connections Connections to authenticate using this authenticator
* @param {authResultCallback} callback The callback to return the result from the authentication
* @return {object}
*/
X509.prototype.reauthenticate = function(server, connections, callback) {
var authStore = this.authStore.slice(0);
var count = authStore.length;
if (count === 0) return callback(null, null);
// Iterate over all the auth details stored
for (var i = 0; i < authStore.length; i++) {
this.auth(
server,
connections,
authStore[i].db,
authStore[i].username,
authStore[i].password,
function(err) {
count = count - 1;
// Done re-authenticating
if (count === 0) {
callback(err, null);
}
}
);
sendAuthCommand(connection, '$external.$cmd', command, callback);
}
};
}
/**
* This is a result from a authentication strategy
*
* @callback authResultCallback
* @param {error} error An error object. Set to null if no error present
* @param {boolean} result The result of the authentication process
*/
module.exports = X509;
'use strict';
const Msg = require('../connection/msg').Msg;
const KillCursor = require('../connection/commands').KillCursor;

@@ -20,3 +21,3 @@ const GetMore = require('../connection/commands').GetMore;

// helper methods
const extractCommandName = command => Object.keys(command)[0];
const extractCommandName = commandDoc => Object.keys(commandDoc)[0];
const namespace = command => command.ns;

@@ -79,2 +80,6 @@ const databaseName = command => command.ns.split('.')[0];

if (command instanceof Msg) {
return command.command;
}
if (command.query && command.query.$query) {

@@ -147,2 +152,5 @@ let result;

// in the event of a `noResponse` command, just return
if (reply === null) return reply;
return reply.result;

@@ -149,0 +157,0 @@ };

@@ -6,3 +6,2 @@ 'use strict';

var Long = BSON.Long;
const MongoError = require('../error').MongoError;
const Buffer = require('safe-buffer').Buffer;

@@ -468,49 +467,2 @@

//
// Single document and documentsReturnedIn set
//
if (this.numberReturned === 1 && documentsReturnedIn != null && raw) {
// Calculate the bson size
bsonSize =
this.data[this.index] |
(this.data[this.index + 1] << 8) |
(this.data[this.index + 2] << 16) |
(this.data[this.index + 3] << 24);
// Slice out the buffer containing the command result document
var document = this.data.slice(this.index, this.index + bsonSize);
// Set up field we wish to keep as raw
var fieldsAsRaw = {};
fieldsAsRaw[documentsReturnedIn] = true;
_options.fieldsAsRaw = fieldsAsRaw;
// Deserialize but keep the array of documents in non-parsed form
var doc = this.bson.deserialize(document, _options);
if (doc instanceof Error) {
throw doc;
}
if (doc.errmsg) {
throw new MongoError(doc.errmsg);
}
if (!doc.cursor) {
throw new MongoError('Cursor not found');
}
// Get the documents
this.documents = doc.cursor[documentsReturnedIn];
this.numberReturned = this.documents.length;
// Ensure we have a Long valie cursor id
this.cursorId =
typeof doc.cursor.id === 'number' ? Long.fromNumber(doc.cursor.id) : doc.cursor.id;
// Adjust the index
this.index = this.index + bsonSize;
// Set as parsed
this.parsed = true;
return;
}
//
// Parse Body

@@ -539,2 +491,11 @@ //

if (this.documents.length === 1 && documentsReturnedIn != null && raw) {
const fieldsAsRaw = {};
fieldsAsRaw[documentsReturnedIn] = true;
_options.fieldsAsRaw = fieldsAsRaw;
const doc = this.bson.deserialize(this.documents[0], _options);
this.documents = [doc];
}
// Set parsed

@@ -541,0 +502,0 @@ this.parsed = true;

'use strict';
var inherits = require('util').inherits,
EventEmitter = require('events').EventEmitter,
net = require('net'),
tls = require('tls'),
crypto = require('crypto'),
f = require('util').format,
debugOptions = require('./utils').debugOptions,
parseHeader = require('../wireprotocol/shared').parseHeader,
decompress = require('../wireprotocol/compression').decompress,
Response = require('./commands').Response,
MongoNetworkError = require('../error').MongoNetworkError,
Logger = require('./logger'),
OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED,
MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE,
Buffer = require('safe-buffer').Buffer;
const EventEmitter = require('events').EventEmitter;
const crypto = require('crypto');
const debugOptions = require('./utils').debugOptions;
const parseHeader = require('../wireprotocol/shared').parseHeader;
const decompress = require('../wireprotocol/compression').decompress;
const Response = require('./commands').Response;
const BinMsg = require('./msg').BinMsg;
const MongoNetworkError = require('../error').MongoNetworkError;
const MongoError = require('../error').MongoError;
const Logger = require('./logger');
const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED;
const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG;
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
const Buffer = require('safe-buffer').Buffer;
var _id = 0;
var debugFields = [
let _id = 0;
const DEFAULT_MAX_BSON_MESSAGE_SIZE = 1024 * 1024 * 16 * 4;
const DEBUG_FIELDS = [
'host',

@@ -29,3 +30,2 @@ 'port',

'socketTimeout',
'singleBufferSerializtion',
'ssl',

@@ -42,29 +42,9 @@ 'ca',

var connectionAccountingSpy = undefined;
var connectionAccounting = false;
var connections = {};
let connectionAccountingSpy = undefined;
let connectionAccounting = false;
let connections = {};
/**
* Creates a new Connection instance
* @class
* @param {string} options.host The server host
* @param {number} options.port The server port
* @param {number} [options.family=null] IP version for DNS lookup, passed down to Node's [`dns.lookup()` function](https://nodejs.org/api/dns.html#dns_dns_lookup_hostname_options_callback). If set to `6`, will only look for ipv6 addresses.
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
* @param {boolean} [options.noDelay=true] TCP Connection no delay
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
* @param {boolean} [options.ssl=false] Use SSL for connection
* @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
* @param {Buffer} [options.ca] SSL Certificate store binary buffer
* @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer
* @param {Buffer} [options.cert] SSL Certificate binary buffer
* @param {Buffer} [options.key] SSL Key file binary buffer
* @param {string} [options.passphrase] SSL Certificate pass phrase
* @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
* @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
* A class representing a single connection to a MongoDB server
*
* @fires Connection#connect

@@ -75,140 +55,227 @@ * @fires Connection#close

* @fires Connection#parseError
* @return {Connection} A cursor instance
* @fires Connection#message
*/
var Connection = function(messageHandler, options) {
// Add event listener
EventEmitter.call(this);
// Set empty if no options passed
this.options = options || {};
// Identification information
this.id = _id++;
// Logger instance
this.logger = Logger('Connection', options);
// No bson parser passed in
if (!options.bson) throw new Error('must pass in valid bson parser');
// Get bson parser
this.bson = options.bson;
// Grouping tag used for debugging purposes
this.tag = options.tag;
// Message handler
this.messageHandler = messageHandler;
class Connection extends EventEmitter {
/**
* Creates a new Connection instance
*
* @param {Socket} socket The socket this connection wraps
* @param {Object} [options] Optional settings
* @param {string} [options.host] The host the socket is connected to
* @param {number} [options.port] The port used for the socket connection
* @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
* @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
* @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
* @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
* @param {boolean} [options.promoteLongs] Convert Long values from the db into Numbers if they fit into 53 bits
* @param {boolean} [options.promoteValues] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
* @param {boolean} [options.promoteBuffers] Promotes Binary BSON values to native Node Buffers.
*/
constructor(socket, options) {
super();
// Max BSON message size
this.maxBsonMessageSize = options.maxBsonMessageSize || 1024 * 1024 * 16 * 4;
// Debug information
if (this.logger.isDebug())
this.logger.debug(
f(
'creating connection %s with options [%s]',
this.id,
JSON.stringify(debugOptions(debugFields, options))
)
);
options = options || {};
if (!options.bson) {
throw new TypeError('must pass in valid bson parser');
}
// Default options
this.port = options.port || 27017;
this.host = options.host || 'localhost';
this.family = typeof options.family === 'number' ? options.family : void 0;
this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
this.keepAliveInitialDelay =
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000;
this.noDelay = typeof options.noDelay === 'boolean' ? options.noDelay : true;
this.connectionTimeout =
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000;
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
this.id = _id++;
this.options = options;
this.logger = Logger('Connection', options);
this.bson = options.bson;
this.tag = options.tag;
this.maxBsonMessageSize = options.maxBsonMessageSize || DEFAULT_MAX_BSON_MESSAGE_SIZE;
// Is the keepAliveInitialDelay > socketTimeout set it to half of socketTimeout
if (this.keepAliveInitialDelay > this.socketTimeout) {
this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2);
this.port = options.port || 27017;
this.host = options.host || 'localhost';
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
// These values are inspected directly in tests, but maybe not necessary to keep around
this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
this.keepAliveInitialDelay =
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000;
this.connectionTimeout =
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000;
if (this.keepAliveInitialDelay > this.socketTimeout) {
this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2);
}
// Debug information
if (this.logger.isDebug()) {
this.logger.debug(
`creating connection ${this.id} with options [${JSON.stringify(
debugOptions(DEBUG_FIELDS, options)
)}]`
);
}
// Response options
this.responseOptions = {
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false
};
// Flushing
this.flushing = false;
this.queue = [];
// Internal state
this.writeStream = null;
this.destroyed = false;
// Create hash method
const hash = crypto.createHash('sha1');
hash.update(this.address);
this.hashedName = hash.digest('hex');
// All operations in flight on the connection
this.workItems = [];
// setup socket
this.socket = socket;
this.socket.once('error', errorHandler(this));
this.socket.once('timeout', timeoutHandler(this));
this.socket.once('close', closeHandler(this));
this.socket.on('data', dataHandler(this));
if (connectionAccounting) {
addConnection(this.id, this);
}
}
// If connection was destroyed
this.destroyed = false;
setSocketTimeout(value) {
if (this.socket) {
this.socket.setTimeout(value);
}
}
// Check if we have a domain socket
this.domainSocket = this.host.indexOf('/') !== -1;
resetSocketTimeout() {
if (this.socket) {
this.socket.setTimeout(this.socketTimeout);
}
}
// Serialize commands using function
this.singleBufferSerializtion =
typeof options.singleBufferSerializtion === 'boolean' ? options.singleBufferSerializtion : true;
this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
static enableConnectionAccounting(spy) {
if (spy) {
connectionAccountingSpy = spy;
}
// SSL options
this.ca = options.ca || null;
this.crl = options.crl || null;
this.cert = options.cert || null;
this.key = options.key || null;
this.passphrase = options.passphrase || null;
this.ciphers = options.ciphers || null;
this.ecdhCurve = options.ecdhCurve || null;
this.ssl = typeof options.ssl === 'boolean' ? options.ssl : false;
this.rejectUnauthorized =
typeof options.rejectUnauthorized === 'boolean' ? options.rejectUnauthorized : true;
this.checkServerIdentity =
typeof options.checkServerIdentity === 'boolean' ||
typeof options.checkServerIdentity === 'function'
? options.checkServerIdentity
: true;
connectionAccounting = true;
connections = {};
}
// If ssl not enabled
if (!this.ssl) this.rejectUnauthorized = false;
static disableConnectionAccounting() {
connectionAccounting = false;
connectionAccountingSpy = undefined;
}
// Response options
this.responseOptions = {
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false
};
static connections() {
return connections;
}
// Flushing
this.flushing = false;
this.queue = [];
get address() {
return `${this.host}:${this.port}`;
}
// Internal state
this.connection = null;
this.writeStream = null;
/**
* Unref this connection
* @method
* @return {boolean}
*/
unref() {
if (this.socket == null) {
this.once('connect', () => this.socket.unref());
return;
}
// Create hash method
var hash = crypto.createHash('sha1');
hash.update(f('%s:%s', this.host, this.port));
this.socket.unref();
}
// Create a hash name
this.hashedName = hash.digest('hex');
/**
* Destroy connection
* @method
*/
destroy() {
if (connectionAccounting) {
deleteConnection(this.id);
}
// All operations in flight on the connection
this.workItems = [];
};
if (this.socket) {
// Catch posssible exception thrown by node 0.10.x
try {
this.socket.end();
} catch (err) {} // eslint-disable-line
inherits(Connection, EventEmitter);
this.socket.destroy();
}
Connection.prototype.setSocketTimeout = function(value) {
if (this.connection) {
this.connection.setTimeout(value);
this.destroyed = true;
}
};
Connection.prototype.resetSocketTimeout = function() {
if (this.connection) {
this.connection.setTimeout(this.socketTimeout);
/**
* Write to connection
* @method
* @param {Command} command Command to write out need to implement toBin and toBinUnified
*/
write(buffer) {
// Debug Log
if (this.logger.isDebug()) {
if (!Array.isArray(buffer)) {
this.logger.debug(`writing buffer [${buffer.toString('hex')}] to ${this.address}`);
} else {
for (let i = 0; i < buffer.length; i++)
this.logger.debug(`writing buffer [${buffer[i].toString('hex')}] to ${this.address}`);
}
}
// Double check that the connection is not destroyed
if (this.socket.destroyed === false) {
// Write out the command
if (!Array.isArray(buffer)) {
this.socket.write(buffer, 'binary');
return true;
}
// Iterate over all buffers and write them in order to the socket
for (let i = 0; i < buffer.length; i++) {
this.socket.write(buffer[i], 'binary');
}
return true;
}
// Connection is destroyed return write failed
return false;
}
};
Connection.enableConnectionAccounting = function(spy) {
if (spy) {
connectionAccountingSpy = spy;
/**
* Return id of connection as a string
* @method
* @return {string}
*/
toString() {
return '' + this.id;
}
connectionAccounting = true;
connections = {};
};
/**
* Return json object of connection
* @method
* @return {object}
*/
toJSON() {
return { id: this.id, host: this.host, port: this.port };
}
Connection.disableConnectionAccounting = function() {
connectionAccounting = false;
connectionAccountingSpy = undefined;
};
/**
* Is the connection connected
* @method
* @return {boolean}
*/
isConnected() {
if (this.destroyed) return false;
return !this.socket.destroyed && this.socket.writable;
}
}
Connection.connections = function() {
return connections;
};
function deleteConnection(id) {

@@ -234,95 +301,112 @@ // console.log("=== deleted connection " + id + " :: " + (connections[id] ? connections[id].port : ''))

// Connection handlers
var errorHandler = function(self) {
function errorHandler(conn) {
return function(err) {
if (connectionAccounting) deleteConnection(self.id);
if (connectionAccounting) deleteConnection(conn.id);
// Debug information
if (self.logger.isDebug())
self.logger.debug(
f(
'connection %s for [%s:%s] errored out with [%s]',
self.id,
self.host,
self.port,
JSON.stringify(err)
)
if (conn.logger.isDebug()) {
conn.logger.debug(
`connection ${conn.id} for [${conn.address}] errored out with [${JSON.stringify(err)}]`
);
// Emit the error
if (self.listeners('error').length > 0) self.emit('error', new MongoNetworkError(err), self);
}
conn.emit('error', new MongoNetworkError(err), conn);
};
};
}
var timeoutHandler = function(self) {
function timeoutHandler(conn) {
return function() {
if (connectionAccounting) deleteConnection(self.id);
// Debug information
if (self.logger.isDebug())
self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
// Emit timeout error
self.emit(
if (connectionAccounting) deleteConnection(conn.id);
if (conn.logger.isDebug()) {
conn.logger.debug(`connection ${conn.id} for [${conn.address}] timed out`);
}
conn.emit(
'timeout',
new MongoNetworkError(f('connection %s to %s:%s timed out', self.id, self.host, self.port)),
self
new MongoNetworkError(`connection ${conn.id} to ${conn.address} timed out`),
conn
);
};
};
}
var closeHandler = function(self) {
function closeHandler(conn) {
return function(hadError) {
if (connectionAccounting) deleteConnection(self.id);
// Debug information
if (self.logger.isDebug())
self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
if (connectionAccounting) deleteConnection(conn.id);
// Emit close event
if (conn.logger.isDebug()) {
conn.logger.debug(`connection ${conn.id} with for [${conn.address}] closed`);
}
if (!hadError) {
self.emit(
conn.emit(
'close',
new MongoNetworkError(f('connection %s to %s:%s closed', self.id, self.host, self.port)),
self
new MongoNetworkError(`connection ${conn.id} to ${conn.address} closed`),
conn
);
}
};
};
}
// Handle a message once it is received
var emitMessageHandler = function(self, message) {
var msgHeader = parseHeader(message);
if (msgHeader.opCode === OP_COMPRESSED) {
msgHeader.fromCompressed = true;
var index = MESSAGE_HEADER_SIZE;
msgHeader.opCode = message.readInt32LE(index);
index += 4;
msgHeader.length = message.readInt32LE(index);
index += 4;
var compressorID = message[index];
index++;
decompress(compressorID, message.slice(index), function(err, decompressedMsgBody) {
if (err) {
throw err;
}
if (decompressedMsgBody.length !== msgHeader.length) {
throw new Error(
'Decompressing a compressed message from the server failed. The message is corrupt.'
);
}
self.messageHandler(
new Response(self.bson, message, msgHeader, decompressedMsgBody, self.responseOptions),
self
);
});
} else {
self.messageHandler(
new Response(
self.bson,
function processMessage(conn, message) {
const msgHeader = parseHeader(message);
if (msgHeader.opCode !== OP_COMPRESSED) {
const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response;
conn.emit(
'message',
new ResponseConstructor(
conn.bson,
message,
msgHeader,
message.slice(MESSAGE_HEADER_SIZE),
self.responseOptions
conn.responseOptions
),
self
conn
);
return;
}
};
var dataHandler = function(self) {
msgHeader.fromCompressed = true;
let index = MESSAGE_HEADER_SIZE;
msgHeader.opCode = message.readInt32LE(index);
index += 4;
msgHeader.length = message.readInt32LE(index);
index += 4;
const compressorID = message[index];
index++;
decompress(compressorID, message.slice(index), (err, decompressedMsgBody) => {
if (err) {
conn.emit('error', err);
return;
}
if (decompressedMsgBody.length !== msgHeader.length) {
conn.emit(
'error',
new MongoError(
'Decompressing a compressed message from the server failed. The message is corrupt.'
)
);
return;
}
const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response;
conn.emit(
'message',
new ResponseConstructor(
conn.bson,
message,
msgHeader,
decompressedMsgBody,
conn.responseOptions
),
conn
);
});
}
function dataHandler(conn) {
return function(data) {

@@ -332,11 +416,11 @@ // Parse until we are done with the data

// If we still have bytes to read on the current message
if (self.bytesRead > 0 && self.sizeOfMessage > 0) {
if (conn.bytesRead > 0 && conn.sizeOfMessage > 0) {
// Calculate the amount of remaining bytes
var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
const remainingBytesToRead = conn.sizeOfMessage - conn.bytesRead;
// Check if the current chunk contains the rest of the message
if (remainingBytesToRead > data.length) {
// Copy the new data into the exiting buffer (should have been allocated when we know the message size)
data.copy(self.buffer, self.bytesRead);
data.copy(conn.buffer, conn.bytesRead);
// Adjust the number of bytes read so it point to the correct index in the buffer
self.bytesRead = self.bytesRead + data.length;
conn.bytesRead = conn.bytesRead + data.length;

@@ -347,3 +431,3 @@ // Reset state of buffer

// Copy the missing part of the data into our current buffer
data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
data.copy(conn.buffer, conn.bytesRead, 0, remainingBytesToRead);
// Slice the overflow into a new buffer that we will then re-parse

@@ -353,25 +437,10 @@ data = data.slice(remainingBytesToRead);

// Emit current complete message
try {
var emitBuffer = self.buffer;
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
const emitBuffer = conn.buffer;
// Reset state of buffer
conn.buffer = null;
conn.sizeOfMessage = 0;
conn.bytesRead = 0;
conn.stubBuffer = null;
emitMessageHandler(self, emitBuffer);
} catch (err) {
var errorObject = {
err: 'socketHandler',
trace: err,
bin: self.buffer,
parseState: {
sizeOfMessage: self.sizeOfMessage,
bytesRead: self.bytesRead,
stubBuffer: self.stubBuffer
}
};
// We got a parse Error fire it off then keep going
self.emit('parseError', errorObject, self);
}
processMessage(conn, emitBuffer);
}

@@ -381,9 +450,9 @@ } else {

// size of the message (< 4 bytes)
if (self.stubBuffer != null && self.stubBuffer.length > 0) {
if (conn.stubBuffer != null && conn.stubBuffer.length > 0) {
// If we have enough bytes to determine the message size let's do it
if (self.stubBuffer.length + data.length > 4) {
if (conn.stubBuffer.length + data.length > 4) {
// Prepad the data
var newData = Buffer.alloc(self.stubBuffer.length + data.length);
self.stubBuffer.copy(newData, 0);
data.copy(newData, self.stubBuffer.length);
const newData = Buffer.alloc(conn.stubBuffer.length + data.length);
conn.stubBuffer.copy(newData, 0);
data.copy(newData, conn.stubBuffer.length);
// Reassign for parsing

@@ -393,13 +462,13 @@ data = newData;

// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
conn.buffer = null;
conn.sizeOfMessage = 0;
conn.bytesRead = 0;
conn.stubBuffer = null;
} else {
// Add the the bytes to the stub buffer
var newStubBuffer = Buffer.alloc(self.stubBuffer.length + data.length);
const newStubBuffer = Buffer.alloc(conn.stubBuffer.length + data.length);
// Copy existing stub buffer
self.stubBuffer.copy(newStubBuffer, 0);
conn.stubBuffer.copy(newStubBuffer, 0);
// Copy missing part of the data
data.copy(newStubBuffer, self.stubBuffer.length);
data.copy(newStubBuffer, conn.stubBuffer.length);
// Exit parsing loop

@@ -411,18 +480,17 @@ data = Buffer.alloc(0);

// Retrieve the message size
// var sizeOfMessage = data.readUInt32LE(0);
var sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24);
const sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24);
// If we have a negative sizeOfMessage emit error and return
if (sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
errorObject = {
if (sizeOfMessage < 0 || sizeOfMessage > conn.maxBsonMessageSize) {
const errorObject = {
err: 'socketHandler',
trace: '',
bin: self.buffer,
bin: conn.buffer,
parseState: {
sizeOfMessage: sizeOfMessage,
bytesRead: self.bytesRead,
stubBuffer: self.stubBuffer
bytesRead: conn.bytesRead,
stubBuffer: conn.stubBuffer
}
};
// We got a parse Error fire it off then keep going
self.emit('parseError', errorObject, self);
conn.emit('parseError', errorObject, conn);
return;

@@ -434,14 +502,14 @@ }

sizeOfMessage > 4 &&
sizeOfMessage < self.maxBsonMessageSize &&
sizeOfMessage < conn.maxBsonMessageSize &&
sizeOfMessage > data.length
) {
self.buffer = Buffer.alloc(sizeOfMessage);
conn.buffer = Buffer.alloc(sizeOfMessage);
// Copy all the data into the buffer
data.copy(self.buffer, 0);
data.copy(conn.buffer, 0);
// Update bytes read
self.bytesRead = data.length;
conn.bytesRead = data.length;
// Update sizeOfMessage
self.sizeOfMessage = sizeOfMessage;
conn.sizeOfMessage = sizeOfMessage;
// Ensure stub buffer is null
self.stubBuffer = null;
conn.stubBuffer = null;
// Exit parsing loop

@@ -451,21 +519,17 @@ data = Buffer.alloc(0);

sizeOfMessage > 4 &&
sizeOfMessage < self.maxBsonMessageSize &&
sizeOfMessage < conn.maxBsonMessageSize &&
sizeOfMessage === data.length
) {
try {
emitBuffer = data;
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Exit parsing loop
data = Buffer.alloc(0);
// Emit the message
emitMessageHandler(self, emitBuffer);
} catch (err) {
self.emit('parseError', err, self);
}
} else if (sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
errorObject = {
const emitBuffer = data;
// Reset state of buffer
conn.buffer = null;
conn.sizeOfMessage = 0;
conn.bytesRead = 0;
conn.stubBuffer = null;
// Exit parsing loop
data = Buffer.alloc(0);
// Emit the message
processMessage(conn, emitBuffer);
} else if (sizeOfMessage <= 4 || sizeOfMessage > conn.maxBsonMessageSize) {
const errorObject = {
err: 'socketHandler',

@@ -482,28 +546,28 @@ trace: null,

// We got a parse Error fire it off then keep going
self.emit('parseError', errorObject, self);
conn.emit('parseError', errorObject, conn);
// Clear out the state of the parser
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
conn.buffer = null;
conn.sizeOfMessage = 0;
conn.bytesRead = 0;
conn.stubBuffer = null;
// Exit parsing loop
data = Buffer.alloc(0);
} else {
emitBuffer = data.slice(0, sizeOfMessage);
const emitBuffer = data.slice(0, sizeOfMessage);
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
conn.buffer = null;
conn.sizeOfMessage = 0;
conn.bytesRead = 0;
conn.stubBuffer = null;
// Copy rest of message
data = data.slice(sizeOfMessage);
// Emit the message
emitMessageHandler(self, emitBuffer);
processMessage(conn, emitBuffer);
}
} else {
// Create a buffer that contains the space for the non-complete message
self.stubBuffer = Buffer.alloc(data.length);
conn.stubBuffer = Buffer.alloc(data.length);
// Copy the data to the stub buffer
data.copy(self.stubBuffer, 0);
data.copy(conn.stubBuffer, 0);
// Exit parsing loop

@@ -516,270 +580,5 @@ data = Buffer.alloc(0);

};
};
// List of socket level valid ssl options
var legalSslSocketOptions = [
'pfx',
'key',
'passphrase',
'cert',
'ca',
'ciphers',
'NPNProtocols',
'ALPNProtocols',
'servername',
'ecdhCurve',
'secureProtocol',
'secureContext',
'session',
'minDHSize'
];
function merge(options1, options2) {
// Merge in any allowed ssl options
for (var name in options2) {
if (options2[name] != null && legalSslSocketOptions.indexOf(name) !== -1) {
options1[name] = options2[name];
}
}
}
function makeSSLConnection(self, _options) {
let sslOptions = {
socket: self.connection,
rejectUnauthorized: self.rejectUnauthorized
};
// Merge in options
merge(sslOptions, self.options);
merge(sslOptions, _options);
// Set options for ssl
if (self.ca) sslOptions.ca = self.ca;
if (self.crl) sslOptions.crl = self.crl;
if (self.cert) sslOptions.cert = self.cert;
if (self.key) sslOptions.key = self.key;
if (self.passphrase) sslOptions.passphrase = self.passphrase;
// Override checkServerIdentity behavior
if (self.checkServerIdentity === false) {
// Skip the identiy check by retuning undefined as per node documents
// https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
sslOptions.checkServerIdentity = function() {
return undefined;
};
} else if (typeof self.checkServerIdentity === 'function') {
sslOptions.checkServerIdentity = self.checkServerIdentity;
}
// Set default sni servername to be the same as host
if (sslOptions.servername == null) {
sslOptions.servername = self.host;
}
// Attempt SSL connection
const connection = tls.connect(self.port, self.host, sslOptions, function() {
// Error on auth or skip
if (connection.authorizationError && self.rejectUnauthorized) {
return self.emit('error', connection.authorizationError, self, { ssl: true });
}
// Set socket timeout instead of connection timeout
connection.setTimeout(self.socketTimeout);
// We are done emit connect
self.emit('connect', self);
});
// Set the options for the connection
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
connection.setTimeout(self.connectionTimeout);
connection.setNoDelay(self.noDelay);
return connection;
}
function makeUnsecureConnection(self, family) {
// Create new connection instance
let connection_options;
if (self.domainSocket) {
connection_options = { path: self.host };
} else {
connection_options = { port: self.port, host: self.host };
connection_options.family = family;
}
const connection = net.createConnection(connection_options);
// Set the options for the connection
connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
connection.setTimeout(self.connectionTimeout);
connection.setNoDelay(self.noDelay);
connection.once('connect', function() {
// Set socket timeout instead of connection timeout
connection.setTimeout(self.socketTimeout);
// Emit connect event
self.emit('connect', self);
});
return connection;
}
function doConnect(self, family, _options, _errorHandler) {
self.connection = self.ssl
? makeSSLConnection(self, _options)
: makeUnsecureConnection(self, family);
// Add handlers for events
self.connection.once('error', _errorHandler);
self.connection.once('timeout', timeoutHandler(self));
self.connection.once('close', closeHandler(self));
self.connection.on('data', dataHandler(self));
}
/**
* Connect
* @method
*/
Connection.prototype.connect = function(_options) {
_options = _options || {};
// Set the connections
if (connectionAccounting) addConnection(this.id, this);
// Check if we are overriding the promoteLongs
if (typeof _options.promoteLongs === 'boolean') {
this.responseOptions.promoteLongs = _options.promoteLongs;
this.responseOptions.promoteValues = _options.promoteValues;
this.responseOptions.promoteBuffers = _options.promoteBuffers;
}
const _errorHandler = errorHandler(this);
if (this.family !== void 0) {
return doConnect(this, this.family, _options, _errorHandler);
}
return doConnect(this, 6, _options, err => {
if (this.logger.isDebug()) {
this.logger.debug(
f(
'connection %s for [%s:%s] errored out with [%s]',
this.id,
this.host,
this.port,
JSON.stringify(err)
)
);
}
// clean up existing event handlers
this.connection.removeAllListeners('error');
this.connection.removeAllListeners('timeout');
this.connection.removeAllListeners('close');
this.connection.removeAllListeners('data');
this.connection = undefined;
return doConnect(this, 4, _options, _errorHandler);
});
};
/**
* Unref this connection
* @method
* @return {boolean}
*/
Connection.prototype.unref = function() {
if (this.connection) this.connection.unref();
else {
var self = this;
this.once('connect', function() {
self.connection.unref();
});
}
};
/**
* Destroy connection
* @method
*/
Connection.prototype.destroy = function() {
// Set the connections
if (connectionAccounting) deleteConnection(this.id);
if (this.connection) {
// Catch posssible exception thrown by node 0.10.x
try {
this.connection.end();
} catch (err) {} // eslint-disable-line
// Destroy connection
this.connection.destroy();
}
this.destroyed = true;
};
/**
* Write to connection
* @method
* @param {Command} command Command to write out need to implement toBin and toBinUnified
*/
Connection.prototype.write = function(buffer) {
var i;
// Debug Log
if (this.logger.isDebug()) {
if (!Array.isArray(buffer)) {
this.logger.debug(
f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port)
);
} else {
for (i = 0; i < buffer.length; i++)
this.logger.debug(
f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port)
);
}
}
// Double check that the connection is not destroyed
if (this.connection.destroyed === false) {
// Write out the command
if (!Array.isArray(buffer)) {
this.connection.write(buffer, 'binary');
return true;
}
// Iterate over all buffers and write them in order to the socket
for (i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
return true;
}
// Connection is destroyed return write failed
return false;
};
/**
* Return id of connection as a string
* @method
* @return {string}
*/
Connection.prototype.toString = function() {
return '' + this.id;
};
/**
* Return json object of connection
* @method
* @return {object}
*/
Connection.prototype.toJSON = function() {
return { id: this.id, host: this.host, port: this.port };
};
/**
* Is the connection connected
* @method
* @return {boolean}
*/
Connection.prototype.isConnected = function() {
if (this.destroyed) return false;
return !this.connection.destroyed && this.connection.writable;
};
/**
* A server connect event, used to verify that the connection is up and running

@@ -819,2 +618,9 @@ *

/**
* An event emitted each time the connection receives a parsed message from the wire
*
* @event Connection#message
* @type {Connection}
*/
module.exports = Connection;

@@ -5,3 +5,2 @@ 'use strict';

const EventEmitter = require('events').EventEmitter;
const Connection = require('./connection');
const MongoError = require('../error').MongoError;

@@ -13,4 +12,6 @@ const MongoNetworkError = require('../error').MongoNetworkError;

const Query = require('./commands').Query;
const Msg = require('./msg').Msg;
const CommandResult = require('./command_result');
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
const opcodes = require('../wireprotocol/shared').opcodes;

@@ -22,4 +23,4 @@ const compress = require('../wireprotocol/compression').compress;

const apm = require('./apm');
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders;
const Buffer = require('safe-buffer').Buffer;
const connect = require('./connect');

@@ -36,2 +37,6 @@ var DISCONNECTED = 'disconnected';

if (topology == null) return false;
if (topology.description) {
return topology.description.maxWireVersion >= 6;
}
return topology.ismaster == null ? false : topology.ismaster.maxWireVersion >= 6;

@@ -142,3 +147,3 @@ }

this.inUseConnections = [];
this.connectingConnections = [];
this.connectingConnections = 0;
// Currently executing

@@ -149,13 +154,5 @@ this.executing = false;

// All the authProviders
this.authProviders = options.authProviders || defaultAuthProviders(options.bson);
// Contains the reconnect connection
this.reconnectConnection = null;
// Are we currently authenticating
this.authenticating = false;
this.loggingout = false;
this.nonAuthenticatedConnections = [];
this.authenticatingTimestamp = null;
// Number of consecutive timeouts caught

@@ -165,2 +162,25 @@ this.numberOfConsecutiveTimeouts = 0;

this.connectionIndex = 0;
// event handlers
const pool = this;
this._messageHandler = messageHandler(this);
this._connectionCloseHandler = function(err) {
const connection = this;
connectionFailureHandler(pool, 'close', err, connection);
};
this._connectionErrorHandler = function(err) {
const connection = this;
connectionFailureHandler(pool, 'error', err, connection);
};
this._connectionTimeoutHandler = function(err) {
const connection = this;
connectionFailureHandler(pool, 'timeout', err, connection);
};
this._connectionParseErrorHandler = function(err) {
const connection = this;
connectionFailureHandler(pool, 'parseError', err, connection);
};
};

@@ -225,119 +245,55 @@

function authenticate(pool, auth, connection, cb) {
if (auth[0] === undefined) return cb(null);
// We need to authenticate the server
var mechanism = auth[0];
var db = auth[1];
// Validate if the mechanism exists
if (!pool.authProviders[mechanism]) {
throw new MongoError(f('authMechanism %s not supported', mechanism));
}
function connectionFailureHandler(pool, event, err, conn) {
if (conn) {
if (conn._connectionFailHandled) return;
conn._connectionFailHandled = true;
conn.destroy();
// Get the provider
var provider = pool.authProviders[mechanism];
// Authenticate using the provided mechanism
provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb]));
}
// The write function used by the authentication mechanism (bypasses external)
function write(self) {
return function(connection, command, callback) {
// Get the raw buffer
// Ensure we stop auth if pool was destroyed
if (self.state === DESTROYED || self.state === DESTROYING) {
return callback(new MongoError('pool destroyed'));
}
// Set the connection workItem callback
connection.workItems.push({
cb: callback,
command: true,
requestId: command.requestId
});
// Write the buffer out to the connection
connection.write(command.toBin());
};
}
function reauthenticate(pool, connection, cb) {
// Authenticate
function authenticateAgainstProvider(pool, connection, providers, cb) {
// Finished re-authenticating against providers
if (providers.length === 0) return cb();
// Get the provider name
var provider = pool.authProviders[providers.pop()];
// Auth provider
provider.reauthenticate(write(pool), [connection], function(err) {
// We got an error return immediately
if (err) return cb(err);
// Continue authenticating the connection
authenticateAgainstProvider(pool, connection, providers, cb);
});
}
// Start re-authenticating process
authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb);
}
function connectionFailureHandler(self, event) {
return function(err) {
if (this._connectionFailHandled) return;
this._connectionFailHandled = true;
// Destroy the connection
this.destroy();
// Remove the connection
removeConnection(self, this);
removeConnection(pool, conn);
// Flush all work Items on this connection
while (this.workItems.length > 0) {
var workItem = this.workItems.shift();
while (conn.workItems.length > 0) {
const workItem = conn.workItems.shift();
if (workItem.cb) workItem.cb(err);
}
}
// Did we catch a timeout, increment the numberOfConsecutiveTimeouts
if (event === 'timeout') {
self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1;
// Did we catch a timeout, increment the numberOfConsecutiveTimeouts
if (event === 'timeout') {
pool.numberOfConsecutiveTimeouts = pool.numberOfConsecutiveTimeouts + 1;
// Have we timed out more than reconnectTries in a row ?
// Force close the pool as we are trying to connect to tcp sink hole
if (self.numberOfConsecutiveTimeouts > self.options.reconnectTries) {
self.numberOfConsecutiveTimeouts = 0;
// Destroy all connections and pool
self.destroy(true);
// Emit close event
return self.emit('close', self);
}
// Have we timed out more than reconnectTries in a row ?
// Force close the pool as we are trying to connect to tcp sink hole
if (pool.numberOfConsecutiveTimeouts > pool.options.reconnectTries) {
pool.numberOfConsecutiveTimeouts = 0;
// Destroy all connections and pool
pool.destroy(true);
// Emit close event
return pool.emit('close', pool);
}
}
// No more socket available propegate the event
if (self.socketCount() === 0) {
if (self.state !== DESTROYED && self.state !== DESTROYING) {
stateTransition(self, DISCONNECTED);
}
// Do not emit error events, they are always close events
// do not trigger the low level error handler in node
event = event === 'error' ? 'close' : event;
self.emit(event, err);
// No more socket available propegate the event
if (pool.socketCount() === 0) {
if (pool.state !== DESTROYED && pool.state !== DESTROYING) {
stateTransition(pool, DISCONNECTED);
}
// Start reconnection attempts
if (!self.reconnectId && self.options.reconnect) {
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
}
// Do not emit error events, they are always close events
// do not trigger the low level error handler in node
event = event === 'error' ? 'close' : event;
pool.emit(event, err);
}
// Do we need to do anything to maintain the minimum pool size
const totalConnections =
self.availableConnections.length +
self.connectingConnections.length +
self.inUseConnections.length;
// Start reconnection attempts
if (!pool.reconnectId && pool.options.reconnect) {
pool.reconnectId = setTimeout(attemptReconnect(pool), pool.options.reconnectInterval);
}
if (totalConnections < self.minSize) {
_createConnection(self);
}
};
// Do we need to do anything to maintain the minimum pool size
const totalConnections = totalConnectionCount(pool);
if (totalConnections < pool.minSize) {
_createConnection(pool);
}
}

@@ -356,16 +312,14 @@

// If we have failure schedule a retry
function _connectionFailureHandler(self) {
return function() {
if (this._connectionFailHandled) return;
this._connectionFailHandled = true;
// Destroy the connection
this.destroy();
// Count down the number of reconnects
self.connectingConnections++;
connect(self.options, (err, connection) => {
self.connectingConnections--;
if (err) {
if (self.logger.isDebug()) {
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}
self.retriesLeft = self.retriesLeft - 1;
// How many retries are left
if (self.retriesLeft <= 0) {
// Destroy the instance
self.destroy();
// Emit close event
self.emit(

@@ -384,57 +338,24 @@ 'reconnectFailed',

}
};
}
// Got a connect handler
function _connectHandler(self) {
return function() {
// Assign
var connection = this;
return;
}
// Pool destroyed stop the connection
if (self.state === DESTROYED || self.state === DESTROYING) {
return connection.destroy();
}
if (self.state === DESTROYED || self.state === DESTROYING) {
return connection.destroy();
}
// Clear out all handlers
handlers.forEach(function(event) {
connection.removeAllListeners(event);
});
self.reconnectId = null;
handlers.forEach(event => connection.removeAllListeners(event));
connection.on('error', self._connectionErrorHandler);
connection.on('close', self._connectionCloseHandler);
connection.on('timeout', self._connectionTimeoutHandler);
connection.on('parseError', self._connectionParseErrorHandler);
connection.on('message', self._messageHandler);
// Reset reconnect id
self.reconnectId = null;
// Apply pool connection handlers
connection.on('error', connectionFailureHandler(self, 'error'));
connection.on('close', connectionFailureHandler(self, 'close'));
connection.on('timeout', connectionFailureHandler(self, 'timeout'));
connection.on('parseError', connectionFailureHandler(self, 'parseError'));
// Apply any auth to the connection
reauthenticate(self, this, function() {
// Reset retries
self.retriesLeft = self.options.reconnectTries;
// Push to available connections
self.availableConnections.push(connection);
// Set the reconnectConnection to null
self.reconnectConnection = null;
// Emit reconnect event
self.emit('reconnect', self);
// Trigger execute to start everything up again
_execute(self)();
});
};
}
// Create a connection
self.reconnectConnection = new Connection(messageHandler(self), self.options);
// Add handlers
self.reconnectConnection.on('close', _connectionFailureHandler(self, 'close'));
self.reconnectConnection.on('error', _connectionFailureHandler(self, 'error'));
self.reconnectConnection.on('timeout', _connectionFailureHandler(self, 'timeout'));
self.reconnectConnection.on('parseError', _connectionFailureHandler(self, 'parseError'));
// On connection
self.reconnectConnection.on('connect', _connectHandler(self));
// Attempt connection
self.reconnectConnection.connect();
self.retriesLeft = self.options.reconnectTries;
self.availableConnections.push(connection);
self.reconnectConnection = null;
self.emit('reconnect', self);
_execute(self)();
});
};

@@ -467,2 +388,6 @@ }

if (workItem && workItem.monitoring) {
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
}
// Reset timeout counter

@@ -489,52 +414,2 @@ self.numberOfConsecutiveTimeouts = 0;

// Authenticate any straggler connections
function authenticateStragglers(self, connection, callback) {
// Get any non authenticated connections
var connections = self.nonAuthenticatedConnections.slice(0);
var nonAuthenticatedConnections = self.nonAuthenticatedConnections;
self.nonAuthenticatedConnections = [];
// Establish if the connection need to be authenticated
// Add to authentication list if
// 1. we were in an authentication process when the operation was executed
// 2. our current authentication timestamp is from the workItem one, meaning an auth has happened
if (
connection.workItems.length === 1 &&
(connection.workItems[0].authenticating === true ||
(typeof connection.workItems[0].authenticatingTimestamp === 'number' &&
connection.workItems[0].authenticatingTimestamp !== self.authenticatingTimestamp))
) {
// Add connection to the list
connections.push(connection);
}
// No connections need to be re-authenticated
if (connections.length === 0) {
// Release the connection back to the pool
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
// Finish
return callback();
}
// Apply re-authentication to all connections before releasing back to pool
var connectionCount = connections.length;
// Authenticate all connections
for (var i = 0; i < connectionCount; i++) {
reauthenticate(self, connections[i], function() {
connectionCount = connectionCount - 1;
if (connectionCount === 0) {
// Put non authenticated connections in available connections
self.availableConnections = self.availableConnections.concat(
nonAuthenticatedConnections
);
// Release the connection back to the pool
moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
// Return
callback();
}
});
}
}
function handleOperationCallback(self, cb, err, result) {

@@ -552,71 +427,67 @@ // No domain enabled

authenticateStragglers(self, connection, function() {
// Keep executing, ensure current message handler does not stop execution
if (!self.executing) {
process.nextTick(function() {
_execute(self)();
});
// Keep executing, ensure current message handler does not stop execution
if (!self.executing) {
process.nextTick(function() {
_execute(self)();
});
}
// Time to dispatch the message if we have a callback
if (workItem && !workItem.immediateRelease) {
try {
// Parse the message according to the provided options
message.parse(workItem);
} catch (err) {
return handleOperationCallback(self, workItem.cb, new MongoError(err));
}
// Time to dispatch the message if we have a callback
if (workItem && !workItem.immediateRelease) {
try {
// Parse the message according to the provided options
message.parse(workItem);
} catch (err) {
return handleOperationCallback(self, workItem.cb, new MongoError(err));
}
// Look for clusterTime, operationTime, and recoveryToken and update them if necessary
if (message.documents[0]) {
const session = workItem.session;
const document = message.documents[0];
if (document.$clusterTime) {
const $clusterTime = document.$clusterTime;
self.topology.clusterTime = $clusterTime;
// Look for clusterTime, and operationTime and update them if necessary
if (message.documents[0]) {
if (message.documents[0].$clusterTime) {
const $clusterTime = message.documents[0].$clusterTime;
self.topology.clusterTime = $clusterTime;
if (workItem.session != null) {
resolveClusterTime(workItem.session, $clusterTime);
}
if (session != null) {
resolveClusterTime(session, $clusterTime);
}
}
if (
message.documents[0].operationTime &&
workItem.session &&
workItem.session.supports.causalConsistency
) {
workItem.session.advanceOperationTime(message.documents[0].operationTime);
}
if (document.operationTime && session && session.supports.causalConsistency) {
session.advanceOperationTime(message.documents[0].operationTime);
}
// Establish if we have an error
if (workItem.command && message.documents[0]) {
const responseDoc = message.documents[0];
if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) {
return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc));
}
if (document.recoveryToken && session && session.inTransaction()) {
session.transaction._recoveryToken = document.recoveryToken;
}
}
if (responseDoc.writeConcernError) {
const err =
responseDoc.ok === 1
? new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc)
: new MongoWriteConcernError(responseDoc.writeConcernError);
return handleOperationCallback(self, workItem.cb, err);
}
// Establish if we have an error
if (workItem.command && message.documents[0]) {
const responseDoc = message.documents[0];
if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) {
return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc));
}
// Add the connection details
message.hashedName = connection.hashedName;
if (responseDoc.writeConcernError) {
const err =
responseDoc.ok === 1
? new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc)
: new MongoWriteConcernError(responseDoc.writeConcernError);
return handleOperationCallback(self, workItem.cb, err);
}
}
// Return the documents
handleOperationCallback(
self,
workItem.cb,
null,
new CommandResult(
workItem.fullResult ? message : message.documents[0],
connection,
message
)
);
}
});
// Add the connection details
message.hashedName = connection.hashedName;
// Return the documents
handleOperationCallback(
self,
workItem.cb,
null,
new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message)
);
}
};

@@ -635,2 +506,8 @@ }

function totalConnectionCount(pool) {
return (
pool.availableConnections.length + pool.inUseConnections.length + pool.connectingConnections
);
}
/**

@@ -642,3 +519,3 @@ * Return all pool connections

Pool.prototype.allConnections = function() {
return this.availableConnections.concat(this.inUseConnections).concat(this.connectingConnections);
return this.availableConnections.concat(this.inUseConnections);
};

@@ -674,7 +551,2 @@

// Might be authenticating, but we are still connected
if (connections.length === 0 && this.authenticating) {
return true;
}
// Not connected

@@ -704,3 +576,2 @@ return false;

* Connect pool
* @method
*/

@@ -712,15 +583,32 @@ Pool.prototype.connect = function() {

var self = this;
// Transition to connecting state
const self = this;
stateTransition(this, CONNECTING);
// Create an array of the arguments
var args = Array.prototype.slice.call(arguments, 0);
// Create a connection
var connection = new Connection(messageHandler(self), this.options);
// Add to list of connections
this.connectingConnections.push(connection);
// Add listeners to the connection
connection.once('connect', function(connection) {
if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy();
self.connectingConnections++;
connect(self.options, (err, connection) => {
self.connectingConnections--;
if (err) {
if (self.logger.isDebug()) {
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}
if (self.state === CONNECTING) {
self.emit('error', err);
}
return;
}
if (self.state === DESTROYED || self.state === DESTROYING) {
return self.destroy();
}
// attach event handlers
connection.on('error', self._connectionErrorHandler);
connection.on('close', self._connectionCloseHandler);
connection.on('timeout', self._connectionTimeoutHandler);
connection.on('parseError', self._connectionParseErrorHandler);
connection.on('message', self._messageHandler);
// If we are in a topology, delegate the auth to it

@@ -730,66 +618,27 @@ // This is to avoid issues where we would auth against an

if (self.options.inTopology) {
// Set connected mode
stateTransition(self, CONNECTED);
self.availableConnections.push(connection);
return self.emit('connect', self, connection);
}
// Move the active connection
moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
if (self.state === DESTROYED || self.state === DESTROYING) {
return self.destroy();
}
// Emit the connect event
return self.emit('connect', self);
if (err) {
self.destroy();
return self.emit('error', err);
}
// Apply any store credentials
reauthenticate(self, connection, function(err) {
if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy();
stateTransition(self, CONNECTED);
self.availableConnections.push(connection);
// We have an error emit it
if (err) {
// Destroy the pool
self.destroy();
// Emit the error
return self.emit('error', err);
if (self.minSize) {
for (let i = 0; i < self.minSize; i++) {
_createConnection(self);
}
}
// Authenticate
authenticate(self, args, connection, function(err) {
if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy();
// We have an error emit it
if (err) {
// Destroy the pool
self.destroy();
// Emit the error
return self.emit('error', err);
}
// Set connected mode
stateTransition(self, CONNECTED);
// Move the active connection
moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
// if we have a minPoolSize, create a connection
if (self.minSize) {
for (let i = 0; i < self.minSize; i++) _createConnection(self);
}
// Emit the connect event
self.emit('connect', self);
});
});
self.emit('connect', self, connection);
});
// Add error handlers
connection.once('error', connectionFailureHandler(this, 'error'));
connection.once('close', connectionFailureHandler(this, 'close'));
connection.once('timeout', connectionFailureHandler(this, 'timeout'));
connection.once('parseError', connectionFailureHandler(this, 'parseError'));
try {
connection.connect();
} catch (err) {
// SSL or something threw on connect
process.nextTick(function() {
self.emit('error', err);
});
}
};

@@ -799,94 +648,6 @@

* Authenticate using a specified mechanism
* @method
* @param {string} mechanism The Auth mechanism we are invoking
* @param {string} db The db we are invoking the mechanism against
* @param {...object} param Parameters for the specific mechanism
* @param {authResultCallback} callback A callback function
*/
Pool.prototype.auth = function(mechanism) {
var self = this;
var args = Array.prototype.slice.call(arguments, 0);
var callback = args.pop();
// If we don't have the mechanism fail
if (self.authProviders[mechanism] == null && mechanism !== 'default') {
throw new MongoError(f('auth provider %s does not exist', mechanism));
}
// Signal that we are authenticating a new set of credentials
this.authenticating = true;
this.authenticatingTimestamp = new Date().getTime();
// Authenticate all live connections
function authenticateLiveConnections(self, args, cb) {
// Get the current viable connections
var connections = self.allConnections();
// Allow nothing else to use the connections while we authenticate them
self.availableConnections = [];
self.inUseConnections = [];
self.connectingConnections = [];
var connectionsCount = connections.length;
var error = null;
// No connections available, return
if (connectionsCount === 0) {
self.authenticating = false;
return callback(null);
}
// Authenticate the connections
for (var i = 0; i < connections.length; i++) {
authenticate(self, args, connections[i], function(err, result) {
connectionsCount = connectionsCount - 1;
// Store the error
if (err) error = err;
// Processed all connections
if (connectionsCount === 0) {
// Auth finished
self.authenticating = false;
// Add the connections back to available connections
self.availableConnections = self.availableConnections.concat(connections);
// We had an error, return it
if (error) {
// Log the error
if (self.logger.isError()) {
self.logger.error(
f(
'[%s] failed to authenticate against server %s:%s',
self.id,
self.options.host,
self.options.port
)
);
}
return cb(error, result);
}
cb(null, result);
}
});
}
}
// Wait for a logout in process to happen
function waitForLogout(self, cb) {
if (!self.loggingout) return cb();
setTimeout(function() {
waitForLogout(self, cb);
}, 1);
}
// Wait for loggout to finish
waitForLogout(self, function() {
// Authenticate all live connections
authenticateLiveConnections(self, args, function(err, result) {
// Credentials correctly stored in auth provider if successful
// Any new connections will now reauthenticate correctly
self.authenticating = false;
// Return after authentication connections
callback(err, result);
});
});
Pool.prototype.auth = function(credentials, callback) {
callback(null, null);
};

@@ -896,46 +657,6 @@

* Logout all users against a database
* @method
* @param {string} dbName The database name
* @param {authResultCallback} callback A callback function
*/
Pool.prototype.logout = function(dbName, callback) {
var self = this;
if (typeof dbName !== 'string') {
throw new MongoError('logout method requires a db name as first argument');
}
if (typeof callback !== 'function') {
throw new MongoError('logout method requires a callback');
}
// Indicate logout in process
this.loggingout = true;
// Get all relevant connections
var connections = self.availableConnections.concat(self.inUseConnections);
var count = connections.length;
// Store any error
var error = null;
// Send logout command over all the connections
for (var i = 0; i < connections.length; i++) {
write(self)(
connections[i],
new Query(
this.options.bson,
f('%s.$cmd', dbName),
{ logout: 1 },
{ numberToSkip: 0, numberToReturn: 1 }
),
function(err) {
count = count - 1;
if (err) error = err;
if (count === 0) {
self.loggingout = false;
callback(error);
}
}
);
}
callback(null, null);
};

@@ -949,5 +670,4 @@

// Get all the known connections
var connections = this.availableConnections
.concat(this.inUseConnections)
.concat(this.connectingConnections);
var connections = this.availableConnections.concat(this.inUseConnections);
connections.forEach(function(c) {

@@ -959,3 +679,3 @@ c.unref();

// Events
var events = ['error', 'close', 'timeout', 'parseError', 'connect'];
var events = ['error', 'close', 'timeout', 'parseError', 'connect', 'message'];

@@ -977,4 +697,3 @@ // Destroy the connections

self.availableConnections = [];
self.nonAuthenticatedConnections = [];
self.connectingConnections = [];
self.connectingConnections = 0;

@@ -999,6 +718,3 @@ // Set state to destroyed

// Get all the known connections
var connections = self.availableConnections
.concat(self.inUseConnections)
.concat(self.nonAuthenticatedConnections)
.concat(self.connectingConnections);
var connections = self.availableConnections.concat(self.inUseConnections);

@@ -1035,6 +751,3 @@ // Flush any remaining work items with

// Get all the known connections
var connections = self.availableConnections
.concat(self.inUseConnections)
.concat(self.nonAuthenticatedConnections)
.concat(self.connectingConnections);
var connections = self.availableConnections.concat(self.inUseConnections);

@@ -1064,52 +777,64 @@ // Check if we have any in flight operations

/**
* Reset all connections of this pool
*
* @param {function} [callback]
*/
Pool.prototype.reset = function(callback) {
// this.destroy(true, err => {
// if (err && typeof callback === 'function') {
// callback(err, null);
// return;
// }
// stateTransition(this, DISCONNECTED);
// this.connect();
// if (typeof callback === 'function') callback(null, null);
// });
if (typeof callback === 'function') callback();
};
// Prepare the buffer that Pool.prototype.write() uses to send to the server
var serializeCommands = function(self, commands, result, callback) {
// Base case when there are no more commands to serialize
if (commands.length === 0) return callback(null, result);
function serializeCommand(self, command, callback) {
const originalCommandBuffer = command.toBin();
// Pop off the zeroth command and serialize it
var thisCommand = commands.shift();
var originalCommandBuffer = thisCommand.toBin();
// Check whether we and the server have agreed to use a compressor
if (self.options.agreedCompressor && !hasUncompressibleCommands(thisCommand)) {
// Transform originalCommandBuffer into OP_COMPRESSED
var concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer);
var messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
const shouldCompress = !!self.options.agreedCompressor;
if (!shouldCompress || !canCompress(command)) {
return callback(null, originalCommandBuffer);
}
// Extract information needed for OP_COMPRESSED from the uncompressed message
var originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
// Transform originalCommandBuffer into OP_COMPRESSED
const concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer);
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
// Compress the message body
compress(self, messageToBeCompressed, function(err, compressedMessage) {
if (err) return callback(err, null);
// Extract information needed for OP_COMPRESSED from the uncompressed message
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
// Create the msgHeader of OP_COMPRESSED
var msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + 9 + compressedMessage.length, 0); // messageLength
msgHeader.writeInt32LE(thisCommand.requestId, 4); // requestID
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode
// Compress the message body
compress(self, messageToBeCompressed, function(err, compressedMessage) {
if (err) return callback(err, null);
// Create the compression details of OP_COMPRESSED
var compressionDetails = Buffer.alloc(9);
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID
// Create the msgHeader of OP_COMPRESSED
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
msgHeader.writeInt32LE(
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
0
); // messageLength
msgHeader.writeInt32LE(command.requestId, 4); // requestID
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode
// Push the concatenation of the OP_COMPRESSED message onto results
result.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
// Create the compression details of OP_COMPRESSED
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID
// Continue recursing through the commands array
serializeCommands(self, commands, result, callback);
});
} else {
// Push the serialization of the command onto results
result.push(originalCommandBuffer);
return callback(null, [msgHeader, compressionDetails, compressedMessage]);
});
}
// Continue recursing through the commands array
serializeCommands(self, commands, result, callback);
}
};
/**

@@ -1120,3 +845,3 @@ * Write a message to MongoDB

*/
Pool.prototype.write = function(commands, options, cb) {
Pool.prototype.write = function(command, options, cb) {
var self = this;

@@ -1201,9 +926,4 @@ // Ensure we have a callback

// Ensure commands is an array
if (!Array.isArray(commands)) {
commands = [commands];
}
// Get the requestId
operation.requestId = commands[commands.length - 1].requestId;
operation.requestId = command.requestId;

@@ -1244,13 +964,14 @@ if (hasSessionSupport(this.topology)) {

// decorate the commands with session-specific details
commands.forEach(command => {
if (command instanceof Query) {
if (command.query.$query) {
Object.assign(command.query.$query, sessionOptions);
} else {
Object.assign(command.query, sessionOptions);
}
} else {
Object.assign(command, sessionOptions);
}
});
let commandDocument = command;
if (command instanceof Query) {
commandDocument = command.query;
} else if (command instanceof Msg) {
commandDocument = command.command;
}
if (commandDocument.$query) {
commandDocument = commandDocument.$query;
}
Object.assign(commandDocument, sessionOptions);
}

@@ -1260,5 +981,2 @@

if (self.options.monitorCommands) {
// NOTE: there is only ever a single command, for some legacy reason I am unaware of we
// treat this as a potential array of commands
const command = commands[0];
this.emit('commandStarted', new apm.CommandStartedEvent(this, command));

@@ -1292,7 +1010,7 @@

// Prepare the operation buffer
serializeCommands(self, commands, [], function(err, serializedCommands) {
serializeCommand(self, command, (err, serializedBuffers) => {
if (err) throw err;
// Set the operation's buffer to the serialization of the commands
operation.buffer = serializedCommands;
operation.buffer = serializedBuffers;

@@ -1318,7 +1036,7 @@ // If we have a monitoring operation schedule as the very first operation

// Will return true if command contains no uncompressible command terms
var hasUncompressibleCommands = function(command) {
return uncompressibleCommands.some(function(cmd) {
return command.query.hasOwnProperty(cmd);
});
};
function canCompress(command) {
const commandDoc = command instanceof Msg ? command.command : command.query;
const commandName = Object.keys(commandDoc)[0];
return uncompressibleCommands.indexOf(commandName) === -1;
}

@@ -1338,9 +1056,5 @@ // Remove connection method

if (remove(connection, self.inUseConnections)) return;
if (remove(connection, self.connectingConnections)) return;
if (remove(connection, self.nonAuthenticatedConnections)) return;
}
// All event handlers
var handlers = ['close', 'message', 'error', 'timeout', 'parseError', 'connect'];
const handlers = ['close', 'message', 'error', 'timeout', 'parseError', 'connect'];
function _createConnection(self) {

@@ -1350,79 +1064,47 @@ if (self.state === DESTROYED || self.state === DESTROYING) {

}
var connection = new Connection(messageHandler(self), self.options);
// Push the connection
self.connectingConnections.push(connection);
self.connectingConnections++;
connect(self.options, (err, connection) => {
self.connectingConnections--;
// Handle any errors
var tempErrorHandler = function(_connection) {
return function() {
// Destroy the connection
_connection.destroy();
// Remove the connection from the connectingConnections list
removeConnection(self, _connection);
// Start reconnection attempts
if (err) {
if (self.logger.isDebug()) {
self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}
if (!self.reconnectId && self.options.reconnect) {
self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
}
};
};
// Handle successful connection
var tempConnectHandler = function(_connection) {
return function() {
// Destroyed state return
if (self.state === DESTROYED || self.state === DESTROYING) {
// Remove the connection from the list
removeConnection(self, _connection);
return _connection.destroy();
}
return;
}
// Destroy all event emitters
handlers.forEach(function(e) {
_connection.removeAllListeners(e);
});
if (self.state === DESTROYED || self.state === DESTROYING) {
removeConnection(self, connection);
return connection.destroy();
}
// Add the final handlers
_connection.once('close', connectionFailureHandler(self, 'close'));
_connection.once('error', connectionFailureHandler(self, 'error'));
_connection.once('timeout', connectionFailureHandler(self, 'timeout'));
_connection.once('parseError', connectionFailureHandler(self, 'parseError'));
connection.on('error', self._connectionErrorHandler);
connection.on('close', self._connectionCloseHandler);
connection.on('timeout', self._connectionTimeoutHandler);
connection.on('parseError', self._connectionParseErrorHandler);
connection.on('message', self._messageHandler);
// Signal
reauthenticate(self, _connection, function(err) {
if (self.state === DESTROYED || self.state === DESTROYING) {
return _connection.destroy();
}
// Remove the connection from the connectingConnections list
removeConnection(self, _connection);
if (self.state === DESTROYED || self.state === DESTROYING) {
return connection.destroy();
}
// Handle error
if (err) {
return _connection.destroy();
}
// Remove the connection from the connectingConnections list
removeConnection(self, connection);
// If we are c at the moment
// Do not automatially put in available connections
// As we need to apply the credentials first
if (self.authenticating) {
self.nonAuthenticatedConnections.push(_connection);
} else {
// Push to available
self.availableConnections.push(_connection);
// Execute any work waiting
_execute(self)();
}
});
};
};
// Handle error
if (err) {
return connection.destroy();
}
// Add all handlers
connection.once('close', tempErrorHandler(connection));
connection.once('error', tempErrorHandler(connection));
connection.once('timeout', tempErrorHandler(connection));
connection.once('parseError', tempErrorHandler(connection));
connection.once('connect', tempConnectHandler(connection));
// Start connection
connection.connect();
// Push to available
self.availableConnections.push(connection);
// Execute any work waiting
_execute(self)();
});
}

@@ -1450,181 +1132,153 @@

// Wait for auth to clear before continuing
function waitForAuth(cb) {
if (!self.authenticating) return cb();
// Wait for a milisecond and try again
setTimeout(function() {
waitForAuth(cb);
}, 1);
// New pool connections are in progress, wait them to finish
// before executing any more operation to ensure distribution of
// operations
if (self.connectingConnections > 0) {
self.executing = false;
return;
}
// Block on any auth in process
waitForAuth(function() {
// New pool connections are in progress, wait them to finish
// before executing any more operation to ensure distribution of
// operations
if (self.connectingConnections.length > 0) {
return;
// As long as we have available connections
// eslint-disable-next-line
while (true) {
// Total availble connections
const totalConnections = totalConnectionCount(self);
// No available connections available, flush any monitoring ops
if (self.availableConnections.length === 0) {
// Flush any monitoring operations
flushMonitoringOperations(self.queue);
break;
}
// As long as we have available connections
// eslint-disable-next-line
while (true) {
// Total availble connections
var totalConnections =
self.availableConnections.length +
self.connectingConnections.length +
self.inUseConnections.length;
// No queue break
if (self.queue.length === 0) {
break;
}
// No available connections available, flush any monitoring ops
if (self.availableConnections.length === 0) {
// Flush any monitoring operations
flushMonitoringOperations(self.queue);
break;
}
var connection = null;
const connections = self.availableConnections.filter(conn => conn.workItems.length === 0);
// No queue break
if (self.queue.length === 0) {
break;
}
// No connection found that has no work on it, just pick one for pipelining
if (connections.length === 0) {
connection =
self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
} else {
connection = connections[self.connectionIndex++ % connections.length];
}
// Get a connection
var connection = null;
// Is the connection connected
if (!connection.isConnected()) {
// Remove the disconnected connection
removeConnection(self, connection);
// Flush any monitoring operations in the queue, failing fast
flushMonitoringOperations(self.queue);
break;
}
// Locate all connections that have no work
var connections = [];
// Get a list of all connections
for (var i = 0; i < self.availableConnections.length; i++) {
if (self.availableConnections[i].workItems.length === 0) {
connections.push(self.availableConnections[i]);
// Get the next work item
var workItem = self.queue.shift();
// If we are monitoring we need to use a connection that is not
// running another operation to avoid socket timeout changes
// affecting an existing operation
if (workItem.monitoring) {
var foundValidConnection = false;
for (let i = 0; i < self.availableConnections.length; i++) {
// If the connection is connected
// And there are no pending workItems on it
// Then we can safely use it for monitoring.
if (
self.availableConnections[i].isConnected() &&
self.availableConnections[i].workItems.length === 0
) {
foundValidConnection = true;
connection = self.availableConnections[i];
break;
}
}
// No connection found that has no work on it, just pick one for pipelining
if (connections.length === 0) {
connection =
self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
} else {
connection = connections[self.connectionIndex++ % connections.length];
}
// No safe connection found, attempt to grow the connections
// if possible and break from the loop
if (!foundValidConnection) {
// Put workItem back on the queue
self.queue.unshift(workItem);
// Is the connection connected
if (connection.isConnected()) {
// Get the next work item
var workItem = self.queue.shift();
// If we are monitoring we need to use a connection that is not
// running another operation to avoid socket timeout changes
// affecting an existing operation
if (workItem.monitoring) {
var foundValidConnection = false;
for (i = 0; i < self.availableConnections.length; i++) {
// If the connection is connected
// And there are no pending workItems on it
// Then we can safely use it for monitoring.
if (
self.availableConnections[i].isConnected() &&
self.availableConnections[i].workItems.length === 0
) {
foundValidConnection = true;
connection = self.availableConnections[i];
break;
}
}
// No safe connection found, attempt to grow the connections
// if possible and break from the loop
if (!foundValidConnection) {
// Put workItem back on the queue
self.queue.unshift(workItem);
// Attempt to grow the pool if it's not yet maxsize
if (totalConnections < self.options.size && self.queue.length > 0) {
// Create a new connection
_createConnection(self);
}
// Re-execute the operation
setTimeout(function() {
_execute(self)();
}, 10);
break;
}
// Attempt to grow the pool if it's not yet maxsize
if (totalConnections < self.options.size && self.queue.length > 0) {
// Create a new connection
_createConnection(self);
}
// Don't execute operation until we have a full pool
if (totalConnections < self.options.size) {
// Connection has work items, then put it back on the queue
// and create a new connection
if (connection.workItems.length > 0) {
// Lets put the workItem back on the list
self.queue.unshift(workItem);
// Create a new connection
_createConnection(self);
// Break from the loop
break;
}
}
// Re-execute the operation
setTimeout(function() {
_execute(self)();
}, 10);
// Get actual binary commands
var buffer = workItem.buffer;
break;
}
}
// Set current status of authentication process
workItem.authenticating = self.authenticating;
workItem.authenticatingTimestamp = self.authenticatingTimestamp;
// Don't execute operation until we have a full pool
if (totalConnections < self.options.size) {
// Connection has work items, then put it back on the queue
// and create a new connection
if (connection.workItems.length > 0) {
// Lets put the workItem back on the list
self.queue.unshift(workItem);
// Create a new connection
_createConnection(self);
// Break from the loop
break;
}
}
// If we are monitoring take the connection of the availableConnections
if (workItem.monitoring) {
moveConnectionBetween(connection, self.availableConnections, self.inUseConnections);
}
// Get actual binary commands
var buffer = workItem.buffer;
// Track the executing commands on the mongo server
// as long as there is an expected response
if (!workItem.noResponse) {
connection.workItems.push(workItem);
}
// If we are monitoring take the connection of the availableConnections
if (workItem.monitoring) {
moveConnectionBetween(connection, self.availableConnections, self.inUseConnections);
}
// We have a custom socketTimeout
if (!workItem.immediateRelease && typeof workItem.socketTimeout === 'number') {
connection.setSocketTimeout(workItem.socketTimeout);
}
// Track the executing commands on the mongo server
// as long as there is an expected response
if (!workItem.noResponse) {
connection.workItems.push(workItem);
}
// Capture if write was successful
var writeSuccessful = true;
// We have a custom socketTimeout
if (!workItem.immediateRelease && typeof workItem.socketTimeout === 'number') {
connection.setSocketTimeout(workItem.socketTimeout);
}
// Put operation on the wire
if (Array.isArray(buffer)) {
for (i = 0; i < buffer.length; i++) {
writeSuccessful = connection.write(buffer[i]);
}
} else {
writeSuccessful = connection.write(buffer);
}
// Capture if write was successful
var writeSuccessful = true;
// if the command is designated noResponse, call the callback immeditely
if (workItem.noResponse && typeof workItem.cb === 'function') {
workItem.cb(null, null);
}
if (writeSuccessful && workItem.immediateRelease && self.authenticating) {
removeConnection(self, connection);
self.nonAuthenticatedConnections.push(connection);
} else if (writeSuccessful === false) {
// If write not successful put back on queue
self.queue.unshift(workItem);
// Remove the disconnected connection
removeConnection(self, connection);
// Flush any monitoring operations in the queue, failing fast
flushMonitoringOperations(self.queue);
}
} else {
// Remove the disconnected connection
removeConnection(self, connection);
// Flush any monitoring operations in the queue, failing fast
flushMonitoringOperations(self.queue);
// Put operation on the wire
if (Array.isArray(buffer)) {
for (let i = 0; i < buffer.length; i++) {
writeSuccessful = connection.write(buffer[i]);
}
} else {
writeSuccessful = connection.write(buffer);
}
});
// if the command is designated noResponse, call the callback immeditely
if (workItem.noResponse && typeof workItem.cb === 'function') {
workItem.cb(null, null);
}
if (writeSuccessful === false) {
// If write not successful put back on queue
self.queue.unshift(workItem);
// Remove the disconnected connection
removeConnection(self, connection);
// Flush any monitoring operations in the queue, failing fast
flushMonitoringOperations(self.queue);
break;
}
}
self.executing = false;

@@ -1631,0 +1285,0 @@ };

'use strict';
var f = require('util').format,
require_optional = require('require_optional');
const require_optional = require('require_optional');
// Set property function
var setProperty = function(obj, prop, flag, values) {
Object.defineProperty(obj, prop.name, {
enumerable: true,
set: function(value) {
if (typeof value !== 'boolean') throw new Error(f('%s required a boolean', prop.name));
// Flip the bit to 1
if (value === true) values.flags |= flag;
// Flip the bit to 0 if it's set, otherwise ignore
if (value === false && (values.flags & flag) === flag) values.flags ^= flag;
prop.value = value;
},
get: function() {
return prop.value;
}
});
};
// Set property function
var getProperty = function(obj, propName, fieldName, values, func) {
Object.defineProperty(obj, propName, {
enumerable: true,
get: function() {
// Not parsed yet, parse it
if (values[fieldName] == null && obj.isParsed && !obj.isParsed()) {
obj.parse();
}
// Do we have a post processing function
if (typeof func === 'function') return func(values[fieldName]);
// Return raw value
return values[fieldName];
}
});
};
// Set simple property
var getSingleProperty = function(obj, name, value) {
Object.defineProperty(obj, name, {
enumerable: true,
get: function() {
return value;
}
});
};
// Shallow copy
var copy = function(fObj, tObj) {
tObj = tObj || {};
for (var name in fObj) tObj[name] = fObj[name];
return tObj;
};
var debugOptions = function(debugFields, options) {
function debugOptions(debugFields, options) {
var finaloptions = {};

@@ -66,5 +12,5 @@ debugFields.forEach(function(n) {

return finaloptions;
};
}
var retrieveBSON = function() {
function retrieveBSON() {
var BSON = require('bson');

@@ -82,13 +28,13 @@ BSON.native = false;

return BSON;
};
}
// Throw an error if an attempt to use Snappy is made when Snappy is not installed
var noSnappyWarning = function() {
function noSnappyWarning() {
throw new Error(
'Attempted to use Snappy compression, but Snappy is not installed. Install or disable Snappy compression and try again.'
);
};
}
// Facilitate loading Snappy optionally
var retrieveSnappy = function() {
function retrieveSnappy() {
var snappy = null;

@@ -107,10 +53,8 @@ try {

return snappy;
}
module.exports = {
debugOptions,
retrieveBSON,
retrieveSnappy
};
exports.setProperty = setProperty;
exports.getProperty = getProperty;
exports.getSingleProperty = getSingleProperty;
exports.copy = copy;
exports.debugOptions = debugOptions;
exports.retrieveBSON = retrieveBSON;
exports.retrieveSnappy = retrieveSnappy;

@@ -10,3 +10,3 @@ 'use strict';

const collationNotSupported = require('./utils').collationNotSupported;
const wireProtocol = require('./wireprotocol');
const BSON = retrieveBSON();

@@ -227,10 +227,3 @@ const Long = BSON.Long;

this.server.wireProtocolHandler.getMore(
this.server,
this.ns,
this.cursorState,
batchSize,
this.options,
callback
);
wireProtocol.getMore(this.server, this.ns, this.cursorState, batchSize, this.options, callback);
};

@@ -344,3 +337,3 @@

this.server.wireProtocolHandler.killCursor(this.server, this.ns, this.cursorState, callback);
wireProtocol.killCursors(this.server, this.ns, this.cursorState, callback);
};

@@ -619,3 +612,12 @@

return cursor.topology.selectServer(cursor.options, (err, server) => {
// Very explicitly choose what is passed to selectServer
const serverSelectOptions = {};
if (cursor.cursorState.session) {
serverSelectOptions.session = cursor.cursorState.session;
}
if (cursor.options.readPreference) {
serverSelectOptions.readPreference = cursor.options.readPreference;
}
return cursor.topology.selectServer(serverSelectOptions, (err, server) => {
if (err) {

@@ -714,3 +716,4 @@ const disconnectHandler = cursor.disconnectHandler;

// Otherwise fall back to regular find path
cursor.cursorState.cursorId = result.cursorId;
const cursorId = result.cursorId || 0;
cursor.cursorState.cursorId = Long.fromNumber(cursorId);
cursor.cursorState.documents = result.documents;

@@ -740,3 +743,3 @@ cursor.cursorState.lastCursorId = result.cursorId;

if (cursor.cmd.find != null) {
cursor.server.wireProtocolHandler.query(
wireProtocol.query(
cursor.server,

@@ -753,3 +756,3 @@ cursor.ns,

cursor.query = cursor.server.wireProtocolHandler.command(
cursor.query = wireProtocol.command(
cursor.server,

@@ -756,0 +759,0 @@ cursor.ns,

@@ -45,2 +45,6 @@ 'use strict';

}
hasErrorLabel(label) {
return this.errorLabels && this.errorLabels.indexOf(label) !== -1;
}
}

@@ -47,0 +51,0 @@

@@ -125,3 +125,11 @@ 'use strict';

*/
function monitorServer(server) {
function monitorServer(server, options) {
options = options || {};
const heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000;
if (options.initial === true) {
server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
return;
}
// executes a single check of a server

@@ -134,2 +142,5 @@ const checkServer = callback => {

// NOTE: legacy monitoring event
process.nextTick(() => server.emit('monitoring', server));
server.command(

@@ -142,3 +153,3 @@ 'admin.$cmd',

},
function(err, result) {
(err, result) => {
let duration = calculateDurationInMs(start);

@@ -173,6 +184,3 @@

// schedule the next monitoring process
server.s.monitorId = setTimeout(
() => monitorServer(server),
server.s.options.heartbeatFrequencyMS
);
server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
};

@@ -191,17 +199,20 @@

// change its type to `Unknown` only after retrying once.
server.s.pool.reset(() => {
// otherwise re-attempt monitoring once
checkServer((error, isMaster) => {
if (error) {
server.s.monitoring = false;
// TODO: we need to reset the pool here
// we revert to an `Unknown` by emitting a default description with no isMaster
server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, null, { error })
);
return checkServer((err, isMaster) => {
if (err) {
server.s.monitoring = false;
// we do not reschedule monitoring in this case
return;
}
// revert to `Unknown` by emitting a default description with no isMaster
server.emit('descriptionReceived', new ServerDescription(server.description.address));
// do not reschedule monitoring in this case
return;
}
successHandler(isMaster);
successHandler(isMaster);
});
});

@@ -208,0 +219,0 @@ });

@@ -25,2 +25,6 @@ 'use strict';

'maxWireVersion',
'maxBsonObjectSize',
'maxMessageSizeBytes',
'maxWriteBatchSize',
'compression',
'me',

@@ -35,3 +39,6 @@ 'hosts',

'primary',
'logicalSessionTimeoutMinutes'
'logicalSessionTimeoutMinutes',
'saslSupportedMechs',
'__nodejs_mock_server__',
'$clusterTime'
];

@@ -67,3 +74,3 @@

this.address = address;
this.error = null;
this.error = options.error || null;
this.roundTripTime = options.roundTripTime || 0;

@@ -81,2 +88,3 @@ this.lastUpdateTime = Date.now();

// normalize case for hosts
if (this.me) this.me = this.me.toLowerCase();
this.hosts = this.hosts.map(host => host.toLowerCase());

@@ -83,0 +91,0 @@ this.passives = this.passives.map(host => host.toLowerCase());

@@ -11,2 +11,5 @@ 'use strict';

/**
* Returns a server selector that selects for writable servers
*/
function writableServerSelector() {

@@ -18,3 +21,11 @@ return function(topologyDescription, servers) {

// reducers
/**
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification
* found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
*
* @param {ReadPreference} readPreference The read preference providing max staleness guidance
* @param {topologyDescription} topologyDescription The topology description
* @param {ServerDescription[]} servers The list of server descriptions to be reduced
* @return {ServerDescription[]} The list of servers that satisfy the requirements of max staleness
*/
function maxStalenessReducer(readPreference, topologyDescription, servers) {

@@ -29,3 +40,3 @@ if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {

if (maxStaleness < maxStalenessVariance) {
throw MongoError(`maxStalenessSeconds must be at least ${maxStalenessVariance} seconds`);
throw new MongoError(`maxStalenessSeconds must be at least ${maxStalenessVariance} seconds`);
}

@@ -67,2 +78,8 @@

/**
* Determines whether a server's tags match a given set of tags
*
* @param {String[]} tagSet The requested tag set to match
* @param {String[]} serverTags The server's tags
*/
function tagSetMatch(tagSet, serverTags) {

@@ -81,2 +98,9 @@ const keys = Object.keys(tagSet);

/**
* Reduces a set of server descriptions based on tags requested by the read preference
*
* @param {ReadPreference} readPreference The read preference providing the requested tags
* @param {ServerDescription[]} servers The list of server descriptions to reduce
* @return {ServerDescription[]} The list of servers matching the requested tags
*/
function tagSetReducer(readPreference, servers) {

@@ -105,2 +129,11 @@ if (

/**
* Reduces a list of servers to ensure they fall within an acceptable latency window. This is
* further specified in the "Server Selection" specification, found here:
* https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst
*
* @param {topologyDescription} topologyDescription The topology description
* @param {ServerDescription[]} servers The list of servers to reduce
* @returns {ServerDescription[]} The servers which fall within an acceptable latency window
*/
function latencyWindowReducer(topologyDescription, servers) {

@@ -137,2 +170,7 @@ const low = servers.reduce(

/**
* Returns a function which selects servers based on a provided read preference
*
* @param {ReadPreference} readPreference The read preference to select with
*/
function readPreferenceServerSelector(readPreference) {

@@ -139,0 +177,0 @@ if (!readPreference.isValid()) {

@@ -6,6 +6,3 @@ 'use strict';

const relayEvents = require('../utils').relayEvents;
const calculateDurationInMs = require('../utils').calculateDurationInMs;
const Query = require('../connection/commands').Query;
const TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support');
const ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support');
const wireProtocol = require('../wireprotocol');
const BSON = require('../connection/utils').retrieveBSON();

@@ -17,3 +14,39 @@ const createClientInfo = require('../topologies/shared').createClientInfo;

const monitorServer = require('./monitoring').monitorServer;
const MongoParseError = require('../error').MongoParseError;
const MongoNetworkError = require('../error').MongoNetworkError;
const collationNotSupported = require('../utils').collationNotSupported;
const debugOptions = require('../connection/utils').debugOptions;
// Used for filtering out fields for logging
const DEBUG_FIELDS = [
'reconnect',
'reconnectTries',
'reconnectInterval',
'emitError',
'cursorFactory',
'host',
'port',
'size',
'keepAlive',
'keepAliveInitialDelay',
'noDelay',
'connectionTimeout',
'checkServerIdentity',
'socketTimeout',
'ssl',
'ca',
'crl',
'cert',
'key',
'rejectUnauthorized',
'promoteLongs',
'promoteValues',
'promoteBuffers',
'servername'
];
const STATE_DISCONNECTED = 0;
const STATE_CONNECTING = 1;
const STATE_CONNECTED = 2;
/**

@@ -32,3 +65,3 @@ *

*/
constructor(description, options) {
constructor(description, options, topology) {
super();

@@ -49,4 +82,10 @@

monitoring: false,
// the implementation of the monitoring method
monitorFunction: options.monitorFunction || monitorServer,
// the connection pool
pool: null
pool: null,
// the server state
state: STATE_DISCONNECTED,
credentials: options.credentials,
topology
};

@@ -65,4 +104,2 @@ }

* Initiate server connect
*
* @param {Array} [options.auth] Array of auth options to apply on connect
*/

@@ -78,11 +115,23 @@ connect(options) {

// create a pool
this.s.pool = new Pool(this, Object.assign(this.s.options, options, { bson: this.s.bson }));
const addressParts = this.description.address.split(':');
const poolOptions = Object.assign(
{ host: addressParts[0], port: parseInt(addressParts[1], 10) },
this.s.options,
options,
{ bson: this.s.bson }
);
// Set up listeners
// NOTE: this should only be the case if we are connecting to a single server
poolOptions.reconnect = true;
this.s.pool = new Pool(this, poolOptions);
// setup listeners
this.s.pool.on('connect', connectEventHandler(this));
this.s.pool.on('close', closeEventHandler(this));
this.s.pool.on('close', errorEventHandler(this));
this.s.pool.on('error', errorEventHandler(this));
this.s.pool.on('parseError', parseErrorEventHandler(this));
// this.s.pool.on('error', errorEventHandler(this));
// it is unclear whether consumers should even know about these events
// this.s.pool.on('timeout', timeoutEventHandler(this));
// this.s.pool.on('parseError', errorEventHandler(this));
// this.s.pool.on('reconnect', reconnectEventHandler(this));

@@ -94,2 +143,4 @@ // this.s.pool.on('reconnectFailed', errorEventHandler(this));

this.s.state = STATE_CONNECTING;
// If auth settings have been provided, use them

@@ -107,10 +158,29 @@ if (options.auth) {

*
* @param {Boolean} [options.emitClose=false] Emit close event on destroy
* @param {Boolean} [options.emitDestroy=false] Emit destroy event on destroy
* @param {Boolean} [options.force=false] Force destroy the pool
*/
destroy(callback) {
if (typeof callback === 'function') {
callback(null, null);
destroy(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = Object.assign({}, { force: false }, options);
if (!this.s.pool) {
this.s.state = STATE_DISCONNECTED;
if (typeof callback === 'function') {
callback(null, null);
}
return;
}
['close', 'error', 'timeout', 'parseError', 'connect'].forEach(event => {
this.s.pool.removeAllListeners(event);
});
if (this.s.monitorId) {
clearTimeout(this.s.monitorId);
}
this.s.pool.destroy(options.force, err => {
this.s.state = STATE_DISCONNECTED;
callback(err);
});
}

@@ -122,6 +192,7 @@

*/
monitor() {
if (this.s.monitoring) return;
monitor(options) {
options = options || {};
if (this.s.state !== STATE_CONNECTED || this.s.monitoring) return;
if (this.s.monitorId) clearTimeout(this.s.monitorId);
monitorServer(this);
this.s.monitorFunction(this, options);
}

@@ -158,8 +229,12 @@

this.s.logger.debug(
`executing command [${JSON.stringify({ ns, cmd, options })}] against ${this.name}`
`executing command [${JSON.stringify({
ns,
cmd,
options: debugOptions(DEBUG_FIELDS, options)
})}] against ${this.name}`
);
}
// Check if we have collation support
if (this.description.maxWireVersion < 5 && cmd.collation) {
// error if collation not supported
if (collationNotSupported(this, cmd)) {
callback(new MongoError(`server ${this.name} does not support collation`));

@@ -169,23 +244,3 @@ return;

// Create the query object
const query = this.s.wireProtocolHandler.command(this, ns, cmd, {}, options);
// Set slave OK of the query
query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false;
// write options
const writeOptions = {
raw: typeof options.raw === 'boolean' ? options.raw : false,
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
command: true,
monitoring: typeof options.monitoring === 'boolean' ? options.monitoring : false,
fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
requestId: query.requestId,
socketTimeout: typeof options.socketTimeout === 'number' ? options.socketTimeout : null,
session: options.session || null
};
// write the operation to the pool
this.s.pool.write(query, writeOptions, callback);
wireProtocol.command(this, ns, cmd, options, callback);
}

@@ -242,2 +297,11 @@

Object.defineProperty(Server.prototype, 'clusterTime', {
get: function() {
return this.s.topology.clusterTime;
},
set: function(clusterTime) {
this.s.topology.clusterTime = clusterTime;
}
});
function basicWriteValidations(server) {

@@ -282,4 +346,3 @@ if (!server.s.pool) {

// Check if we have collation support
if (server.description.maxWireVersion < 5 && options.collation) {
if (collationNotSupported(server, options)) {
callback(new MongoError(`server ${this.name} does not support collation`));

@@ -289,134 +352,52 @@ return;

// Execute write
return server.s.wireProtocolHandler[op](server.s.pool, ns, server.s.bson, ops, options, callback);
return wireProtocol[op](server, ns, ops, options, callback);
}
function saslSupportedMechs(options) {
if (!options) {
return {};
}
function connectEventHandler(server) {
return function(pool, conn) {
const ismaster = conn.ismaster;
server.s.lastIsMasterMS = conn.lastIsMasterMS;
if (conn.agreedCompressor) {
server.s.pool.options.agreedCompressor = conn.agreedCompressor;
}
const authArray = options.auth || [];
const authMechanism = authArray[0] || options.authMechanism;
const authSource = authArray[1] || options.authSource || options.dbName || 'admin';
const user = authArray[2] || options.user;
if (conn.zlibCompressionLevel) {
server.s.pool.options.zlibCompressionLevel = conn.zlibCompressionLevel;
}
if (typeof authMechanism === 'string' && authMechanism.toUpperCase() !== 'DEFAULT') {
return {};
}
if (conn.ismaster.$clusterTime) {
const $clusterTime = conn.ismaster.$clusterTime;
server.s.sclusterTime = $clusterTime;
}
if (!user) {
return {};
}
// log the connection event if requested
if (server.s.logger.isInfo()) {
server.s.logger.info(
`server ${server.name} connected with ismaster [${JSON.stringify(ismaster)}]`
);
}
return { saslSupportedMechs: `${authSource}.${user}` };
}
// emit an event indicating that our description has changed
server.emit('descriptionReceived', new ServerDescription(server.description.address, ismaster));
function extractIsMasterError(err, result) {
if (err) return err;
if (result && result.result && result.result.ok === 0) {
return new MongoError(result.result);
}
// we are connected and handshaked (guaranteed by the pool)
server.s.state = STATE_CONNECTED;
server.emit('connect', server);
};
}
function executeServerHandshake(server, callback) {
// construct an `ismaster` query
const compressors =
server.s.options.compression && server.s.options.compression.compressors
? server.s.options.compression.compressors
: [];
function errorEventHandler(server) {
return function(err) {
if (err) {
server.emit('error', new MongoNetworkError(err));
}
const queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
const query = new Query(
server.s.bson,
'admin.$cmd',
Object.assign(
{ ismaster: true, client: server.s.clientInfo, compression: compressors },
saslSupportedMechs(server.s.options)
),
queryOptions
);
// execute the query
server.s.pool.write(
query,
{ socketTimeout: server.s.options.connectionTimeout || 2000 },
callback
);
}
function configureWireProtocolHandler(ismaster) {
// 3.2 wire protocol handler
if (ismaster.maxWireVersion >= 4) {
return new ThreeTwoWireProtocolSupport();
}
// default to 2.6 wire protocol handler
return new TwoSixWireProtocolSupport();
}
function connectEventHandler(server) {
return function() {
// log information of received information if in info mode
// if (server.s.logger.isInfo()) {
// var object = err instanceof MongoError ? JSON.stringify(err) : {};
// server.s.logger.info(`server ${server.name} fired event ${event} out with message ${object}`);
// }
// begin initial server handshake
const start = process.hrtime();
executeServerHandshake(server, (err, response) => {
// Set initial lastIsMasterMS - is this needed?
server.s.lastIsMasterMS = calculateDurationInMs(start);
const serverError = extractIsMasterError(err, response);
if (serverError) {
server.emit('error', serverError);
return;
}
// extract the ismaster from the server response
const isMaster = response.result;
// compression negotation
if (isMaster && isMaster.compression) {
const localCompressionInfo = server.s.options.compression;
const localCompressors = localCompressionInfo.compressors;
for (var i = 0; i < localCompressors.length; i++) {
if (isMaster.compression.indexOf(localCompressors[i]) > -1) {
server.s.pool.options.agreedCompressor = localCompressors[i];
break;
}
}
if (localCompressionInfo.zlibCompressionLevel) {
server.s.pool.options.zlibCompressionLevel = localCompressionInfo.zlibCompressionLevel;
}
}
// configure the wire protocol handler
server.s.wireProtocolHandler = configureWireProtocolHandler(isMaster);
// log the connection event if requested
if (server.s.logger.isInfo()) {
server.s.logger.info(
`server ${server.name} connected with ismaster [${JSON.stringify(isMaster)}]`
);
}
// emit an event indicating that our description has changed
server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, isMaster)
);
// emit a connect event
server.emit('connect', isMaster);
});
server.emit('close');
};
}
function closeEventHandler(server) {
return function() {
server.emit('close');
function parseErrorEventHandler(server) {
return function(err) {
server.s.state = STATE_DISCONNECTED;
server.emit('error', new MongoParseError(err));
};

@@ -423,0 +404,0 @@ }

@@ -31,3 +31,12 @@ 'use strict';

*/
constructor(topologyType, serverDescriptions, setName, maxSetVersion, maxElectionId, options) {
constructor(
topologyType,
serverDescriptions,
setName,
maxSetVersion,
maxElectionId,
commonWireVersion,
options,
error
) {
options = options || {};

@@ -50,5 +59,9 @@

this.options = options;
this.error = error;
this.commonWireVersion = commonWireVersion || null;
// determine server compatibility
for (const serverDescription of this.servers.values()) {
if (serverDescription.type === ServerType.Unknown) continue;
if (serverDescription.minWireVersion > MAX_SUPPORTED_WIRE_VERSION) {

@@ -84,15 +97,2 @@ this.compatible = false;

/**
* @returns The minimum reported wire version of all known servers
*/
get commonWireVersion() {
return Array.from(this.servers.values())
.filter(server => server.type !== ServerType.Unknown)
.reduce(
(min, server) =>
min == null ? server.maxWireVersion : Math.min(min, server.maxWireVersion),
null
);
}
/**
* Returns a copy of this description updated with a given ServerDescription

@@ -112,2 +112,4 @@ *

let maxElectionId = this.maxElectionId;
let commonWireVersion = this.commonWireVersion;
let error = serverDescription.error || null;

@@ -117,2 +119,11 @@ const serverType = serverDescription.type;

// update common wire version
if (serverDescription.maxWireVersion !== 0) {
if (commonWireVersion == null) {
commonWireVersion = serverDescription.maxWireVersion;
} else {
commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion);
}
}
// update the actual server description

@@ -129,3 +140,5 @@ serverDescriptions.set(address, serverDescription);

maxElectionId,
this.options
commonWireVersion,
this.options,
error
);

@@ -210,3 +223,5 @@ }

maxElectionId,
this.options
commonWireVersion,
this.options,
error
);

@@ -213,0 +228,0 @@ }

'use strict';
const EventEmitter = require('events');
const ServerDescription = require('./server_description').ServerDescription;
const ServerType = require('./server_description').ServerType;
const TopologyDescription = require('./topology_description').TopologyDescription;

@@ -9,3 +10,2 @@ const TopologyType = require('./topology_description').TopologyType;

const MongoTimeoutError = require('../error').MongoTimeoutError;
const MongoNetworkError = require('../error').MongoNetworkError;
const Server = require('./server');

@@ -17,6 +17,12 @@ const relayEvents = require('../utils').relayEvents;

const isRetryableWritesSupported = require('../topologies/shared').isRetryableWritesSupported;
const Cursor = require('./cursor');
const Cursor = require('../cursor');
const deprecate = require('util').deprecate;
const BSON = require('../connection/utils').retrieveBSON();
const createCompressionInfo = require('../topologies/shared').createCompressionInfo;
const isRetryableError = require('../error').isRetryableError;
const MongoParseError = require('../error').MongoParseError;
const ClientSession = require('../sessions').ClientSession;
const createClientInfo = require('../topologies/shared').createClientInfo;
const MongoError = require('../error').MongoError;
const resolveClusterTime = require('../topologies/shared').resolveClusterTime;

@@ -31,5 +37,27 @@ // Global state

heartbeatFrequencyMS: 30000,
minHeartbeatIntervalMS: 500
minHeartbeatFrequencyMS: 500
};
// events that we relay to the `Topology`
const SERVER_RELAY_EVENTS = [
'serverHeartbeatStarted',
'serverHeartbeatSucceeded',
'serverHeartbeatFailed',
'commandStarted',
'commandSucceeded',
'commandFailed',
// NOTE: Legacy events
'monitoring'
];
// all events we listen to from `Server` instances
const LOCAL_SERVER_EVENTS = SERVER_RELAY_EVENTS.concat([
'error',
'connect',
'descriptionReceived',
'close',
'ended'
]);
/**

@@ -60,3 +88,3 @@ * A container of server instances representing a connection to a MongoDB topology.

super();
if (typeof options === 'undefined') {
if (typeof options === 'undefined' && typeof seedlist !== 'string') {
options = seedlist;

@@ -72,2 +100,6 @@ seedlist = [];

seedlist = seedlist || [];
if (typeof seedlist === 'string') {
seedlist = parseStringSeedlist(seedlist);
}
options = Object.assign({}, TOPOLOGY_DEFAULTS, options);

@@ -78,2 +110,3 @@

const serverDescriptions = seedlist.reduce((result, seed) => {
if (seed.domain_socket) seed.host = seed.domain_socket;
const address = seed.port ? `${seed.host}:${seed.port}` : `${seed.host}:27017`;

@@ -88,3 +121,3 @@ result.set(address, new ServerDescription(address));

// passed in options
options: Object.assign({}, options),
options,
// initial seedlist of servers to connect to

@@ -99,2 +132,3 @@ seedlist: seedlist,

null,
null,
options

@@ -108,20 +142,13 @@ ),

// the bson parser
bson:
options.bson ||
new BSON([
BSON.Binary,
BSON.Code,
BSON.DBRef,
BSON.Decimal128,
BSON.Double,
BSON.Int32,
BSON.Long,
BSON.Map,
BSON.MaxKey,
BSON.MinKey,
BSON.ObjectId,
BSON.BSONRegExp,
BSON.Symbol,
BSON.Timestamp
])
bson: options.bson || new BSON(),
// a map of server instances to normalized addresses
servers: new Map(),
// Server Session Pool
sessionPool: null,
// Active client sessions
sessions: [],
// Promise library
promiseLibrary: options.promiseLibrary || Promise,
credentials: options.credentials,
clusterTime: null
};

@@ -131,2 +158,5 @@

this.s.options.compression = { compressors: createCompressionInfo(options) };
// add client info
this.s.clientInfo = createClientInfo(options);
}

@@ -141,2 +171,6 @@

get parserType() {
return BSON.native ? 'c++' : 'js';
}
/**

@@ -158,4 +192,8 @@ * All raw connections

* @param {Array} [options.auth=null] Array of auth options to apply on connect
* @param {function} [callback] An optional callback called once on the first connected server
*/
connect(/* options */) {
connect(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// emit SDAM monitoring events

@@ -176,2 +214,42 @@ this.emit('topologyOpening', new monitoring.TopologyOpeningEvent(this.s.id));

this.s.connected = true;
// otherwise, wait for a server to properly connect based on user provided read preference,
// or primary.
translateReadPreference(options);
const readPreference = options.readPreference || ReadPreference.primary;
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
if (err) {
if (typeof callback === 'function') {
callback(err, null);
} else {
this.emit('error', err);
}
return;
}
const errorHandler = err => {
server.removeListener('connect', connectHandler);
if (typeof callback === 'function') callback(err, null);
};
const connectHandler = (_, err) => {
server.removeListener('error', errorHandler);
this.emit('open', err, this);
this.emit('connect', this);
if (typeof callback === 'function') callback(err, this);
};
const STATE_CONNECTING = 1;
if (server.s.state === STATE_CONNECTING) {
server.once('error', errorHandler);
server.once('connect', connectHandler);
return;
}
connectHandler();
});
}

@@ -182,14 +260,37 @@

*/
close(callback) {
// destroy all child servers
this.s.servers.forEach(server => server.destroy());
close(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// emit an event for close
this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id));
if (this.s.sessionPool) {
this.s.sessions.forEach(session => session.endSession());
this.s.sessionPool.endAllPooledSessions();
}
this.s.connected = false;
const servers = this.s.servers;
if (servers.size === 0) {
this.s.connected = false;
if (typeof callback === 'function') {
callback(null, null);
}
if (typeof callback === 'function') {
callback(null, null);
return;
}
// destroy all child servers
let destroyed = 0;
servers.forEach(server =>
destroyServer(server, this, () => {
destroyed++;
if (destroyed === servers.size) {
// emit an event for close
this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id));
this.s.connected = false;
if (typeof callback === 'function') {
callback(null, null);
}
}
})
);
}

@@ -201,6 +302,21 @@

* @param {function} [selector] An optional selector to select servers by, defaults to a random selection within a latency window
* @param {object} [options] Optional settings related to server selection
* @param {number} [options.serverSelectionTimeoutMS] How long to block for server selection before throwing an error
* @param {function} callback The callback used to indicate success or failure
* @return {Server} An instance of a `Server` meeting the criteria of the predicate provided
*/
selectServer(selector, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
if (typeof options === 'function') {
callback = options;
if (typeof selector !== 'function') {
options = selector;
translateReadPreference(options);
const readPreference = options.readPreference || ReadPreference.primary;
selector = readPreferenceServerSelector(readPreference);
} else {
options = {};
}
}
options = Object.assign(

@@ -212,2 +328,11 @@ {},

const isSharded = this.description.type === TopologyType.Sharded;
const session = options.session;
const transaction = session && session.transaction;
if (isSharded && transaction && transaction.server) {
callback(null, transaction.server);
return;
}
selectServers(

@@ -220,3 +345,9 @@ this,

if (err) return callback(err, null);
callback(null, randomSelection(servers));
const selectedServer = randomSelection(servers);
if (isSharded && transaction && transaction.isActive) {
transaction.pinServer(selectedServer);
}
callback(null, selectedServer);
}

@@ -226,3 +357,46 @@ );

// Sessions related methods
/**
* @return Whether sessions are supported on the current topology
*/
hasSessionSupport() {
return this.description.logicalSessionTimeoutMinutes != null;
}
/**
* Start a logical session
*/
startSession(options, clientOptions) {
const session = new ClientSession(this, this.s.sessionPool, options, clientOptions);
session.once('ended', () => {
this.s.sessions = this.s.sessions.filter(s => !s.equals(session));
});
this.s.sessions.push(session);
return session;
}
/**
* Send endSessions command(s) with the given session ids
*
* @param {Array} sessions The sessions to end
* @param {function} [callback]
*/
endSessions(sessions, callback) {
if (!Array.isArray(sessions)) {
sessions = [sessions];
}
this.command(
'admin.$cmd',
{ endSessions: sessions },
{ readPreference: ReadPreference.primaryPreferred, noResponse: true },
() => {
// intentionally ignored, per spec
if (typeof callback === 'function') callback();
}
);
}
/**
* Update the internal TopologyDescription with a ServerDescription

@@ -243,2 +417,6 @@ *

this.s.description = this.s.description.update(serverDescription);
if (this.s.description.compatibilityError) {
this.emit('error', new MongoError(this.s.description.compatibilityError));
return;
}

@@ -259,2 +437,13 @@ // emit monitoring events for this change

// Driver Sessions Spec: "Whenever a driver receives a cluster time from
// a server it MUST compare it to the current highest seen cluster time
// for the deployment. If the new cluster time is higher than the
// highest seen cluster time it MUST become the new highest seen cluster
// time. Two cluster times are compared using only the BsonTimestamp
// value of the clusterTime embedded field."
const clusterTime = serverDescription.$clusterTime;
if (clusterTime) {
resolveClusterTime(this, clusterTime);
}
this.emit(

@@ -270,22 +459,9 @@ 'topologyDescriptionChanged',

/**
* Authenticate using a specified mechanism
*
* @param {String} mechanism The auth mechanism used for authentication
* @param {String} db The db we are authenticating against
* @param {Object} options Optional settings for the authenticating mechanism
* @param {authResultCallback} callback A callback function
*/
auth(mechanism, db, options, callback) {
callback(null, null);
auth(credentials, callback) {
if (typeof credentials === 'function') (callback = credentials), (credentials = null);
if (typeof callback === 'function') callback(null, true);
}
/**
* Logout from a database
*
* @param {String} db The db we are logging out from
* @param {authResultCallback} callback A callback function
*/
logout(db, callback) {
callback(null, null);
logout(callback) {
if (typeof callback === 'function') callback(null, true);
}

@@ -365,4 +541,6 @@

const readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
this.selectServer(readPreferenceServerSelector(readPreference), (err, server) => {
translateReadPreference(options);
const readPreference = options.readPreference || ReadPreference.primary;
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
if (err) {

@@ -373,3 +551,31 @@ callback(err, null);

server.command(ns, cmd, options, callback);
const willRetryWrite =
!options.retrying &&
!!options.retryWrites &&
options.session &&
isRetryableWritesSupported(this) &&
!options.session.inTransaction() &&
isWriteCommand(cmd);
const cb = (err, result) => {
if (!err) return callback(null, result);
if (!isRetryableError(err)) {
return callback(err);
}
if (willRetryWrite) {
const newOptions = Object.assign({}, options, { retrying: true });
return this.command(ns, cmd, newOptions, callback);
}
return callback(err);
};
// increment and assign txnNumber
if (willRetryWrite) {
options.session.incrementTransactionNumber();
options.willRetryWrite = willRetryWrite;
}
server.command(ns, cmd, options, cb);
});

@@ -398,7 +604,57 @@ }

const CursorClass = options.cursorFactory || this.s.Cursor;
translateReadPreference(options);
return new CursorClass(this.s.bson, ns, cmd, options, topology, this.s.options);
}
get clientInfo() {
return this.s.clientInfo;
}
// Legacy methods for compat with old topology types
isConnected() {
// console.log('not implemented: `isConnected`');
return true;
}
isDestroyed() {
// console.log('not implemented: `isDestroyed`');
return false;
}
unref() {
console.log('not implemented: `unref`');
}
// NOTE: There are many places in code where we explicitly check the last isMaster
// to do feature support detection. This should be done any other way, but for
// now we will just return the first isMaster seen, which should suffice.
lastIsMaster() {
const serverDescriptions = Array.from(this.description.servers.values());
if (serverDescriptions.length === 0) return {};
const sd = serverDescriptions.filter(sd => sd.type !== ServerType.Unknown)[0];
const result = sd || { maxWireVersion: this.description.commonWireVersion };
return result;
}
get logicalSessionTimeoutMinutes() {
return this.description.logicalSessionTimeoutMinutes;
}
get bson() {
return this.s.bson;
}
}
Object.defineProperty(Topology.prototype, 'clusterTime', {
enumerable: true,
get: function() {
return this.s.clusterTime;
},
set: function(clusterTime) {
this.s.clusterTime = clusterTime;
}
});
// legacy aliases

@@ -410,5 +666,41 @@ Topology.prototype.destroy = deprecate(

const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
function isWriteCommand(command) {
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
}
/**
* Destroys a server, and removes all event listeners from the instance
*
* @param {Server} server
*/
function destroyServer(server, topology, callback) {
LOCAL_SERVER_EVENTS.forEach(event => server.removeAllListeners(event));
server.destroy(() => {
topology.emit(
'serverClosed',
new monitoring.ServerClosedEvent(topology.s.id, server.description.address)
);
if (typeof callback === 'function') callback(null, null);
});
}
/**
* Parses a basic seedlist in string form
*
* @param {string} seedlist The seedlist to parse
*/
function parseStringSeedlist(seedlist) {
return seedlist.split(',').map(seed => ({
host: seed.split(':')[0],
port: seed.split(':')[1] || 27017
}));
}
function topologyTypeFromSeedlist(seedlist, options) {
if (seedlist.length === 1 && !options.replicaSet) return TopologyType.Single;
if (options.replicaSet) return TopologyType.ReplicaSetNoPrimary;
const replicaSet = options.replicaSet || options.setName || options.rs_name;
if (seedlist.length === 1 && !replicaSet) return TopologyType.Single;
if (replicaSet) return TopologyType.ReplicaSetNoPrimary;
return TopologyType.Unknown;

@@ -432,5 +724,39 @@ }

function selectServers(topology, selector, timeout, start, callback) {
const duration = calculateDurationInMs(start);
if (duration >= timeout) {
return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
}
// ensure we are connected
if (!topology.s.connected) {
topology.connect();
// we want to make sure we're still within the requested timeout window
const failToConnectTimer = setTimeout(() => {
topology.removeListener('connect', connectHandler);
callback(new MongoTimeoutError('Server selection timed out waiting to connect'));
}, timeout - duration);
const connectHandler = () => {
clearTimeout(failToConnectTimer);
selectServers(topology, selector, timeout, process.hrtime(), callback);
};
topology.once('connect', connectHandler);
return;
}
// otherwise, attempt server selection
const serverDescriptions = Array.from(topology.description.servers.values());
let descriptions;
// support server selection by options with readPreference
if (typeof selector === 'object') {
const readPreference = selector.readPreference
? selector.readPreference
: ReadPreference.primary;
selector = readPreferenceServerSelector(readPreference);
}
try {

@@ -449,44 +775,52 @@ descriptions = selector

const duration = calculateDurationInMs(start);
if (duration >= timeout) {
return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
}
const retrySelection = () => {
// ensure all server monitors attempt monitoring immediately
topology.s.servers.forEach(server => server.monitor());
// ensure all server monitors attempt monitoring soon
topology.s.servers.forEach(server => {
setTimeout(
() => server.monitor({ heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS }),
TOPOLOGY_DEFAULTS.minHeartbeatFrequencyMS
);
});
const iterationTimer = setTimeout(() => {
callback(new MongoTimeoutError('Server selection timed out due to monitoring'));
}, topology.s.minHeartbeatIntervalMS);
topology.once('topologyDescriptionChanged', () => {
const descriptionChangedHandler = () => {
// successful iteration, clear the check timer
clearTimeout(iterationTimer);
if (topology.description.error) {
callback(topology.description.error, null);
return;
}
// topology description has changed due to monitoring, reattempt server selection
selectServers(topology, selector, timeout, start, callback);
});
};
};
// ensure we are connected
if (!topology.s.connected) {
topology.connect();
// we want to make sure we're still within the requested timeout window
const failToConnectTimer = setTimeout(() => {
callback(new MongoTimeoutError('Server selection timed out waiting to connect'));
const iterationTimer = setTimeout(() => {
topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler);
callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
}, timeout - duration);
topology.once('connect', () => {
clearTimeout(failToConnectTimer);
retrySelection();
});
topology.once('topologyDescriptionChanged', descriptionChangedHandler);
};
return;
}
retrySelection();
}
function createAndConnectServer(topology, serverDescription) {
topology.emit(
'serverOpening',
new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address)
);
const server = new Server(serverDescription, topology.s.options, topology);
relayEvents(server, topology, SERVER_RELAY_EVENTS);
server.once('connect', serverConnectEventHandler(server, topology));
server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology));
server.on('error', serverErrorEventHandler(server, topology));
server.on('close', () => topology.emit('close', server));
server.connect();
return server;
}
/**

@@ -501,19 +835,4 @@ * Create `Server` instances for all initially known servers, connect them, and assign

topology.s.servers = serverDescriptions.reduce((servers, serverDescription) => {
// publish an open event for each ServerDescription created
topology.emit(
'serverOpening',
new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address)
);
const server = new Server(serverDescription, topology.s.options);
relayEvents(server, topology, [
'serverHeartbeatStarted',
'serverHeartbeatSucceeded',
'serverHeartbeatFailed'
]);
server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology));
server.on('connect', serverConnectEventHandler(server, topology));
const server = createAndConnectServer(topology, serverDescription);
servers.set(serverDescription.address, server);
server.connect();
return servers;

@@ -523,7 +842,7 @@ }, new Map());

function updateServers(topology, currentServerDescription) {
function updateServers(topology, incomingServerDescription) {
// update the internal server's description
if (topology.s.servers.has(currentServerDescription.address)) {
const server = topology.s.servers.get(currentServerDescription.address);
server.s.description = currentServerDescription;
if (topology.s.servers.has(incomingServerDescription.address)) {
const server = topology.s.servers.get(incomingServerDescription.address);
server.s.description = incomingServerDescription;
}

@@ -534,18 +853,4 @@

if (!topology.s.servers.has(serverDescription.address)) {
topology.emit(
'serverOpening',
new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address)
);
const server = new Server(serverDescription, topology.s.options);
relayEvents(server, topology, [
'serverHeartbeatStarted',
'serverHeartbeatSucceeded',
'serverHeartbeatFailed'
]);
server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology));
server.on('connect', serverConnectEventHandler(server, topology));
const server = createAndConnectServer(topology, serverDescription);
topology.s.servers.set(serverDescription.address, server);
server.connect();
}

@@ -564,5 +869,4 @@ }

server.destroy(() =>
topology.emit('serverClosed', new monitoring.ServerClosedEvent(topology.s.id, serverAddress))
);
// prepare server for garbage collection
destroyServer(server, topology);
}

@@ -572,7 +876,26 @@ }

function serverConnectEventHandler(server, topology) {
return function(/* ismaster */) {
topology.emit('connect', topology);
return function(/* isMaster, err */) {
server.monitor({
initial: true,
heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS
});
};
}
function serverErrorEventHandler(server, topology) {
return function(err) {
topology.emit(
'serverClosed',
new monitoring.ServerClosedEvent(topology.s.id, server.description.address)
);
if (err instanceof MongoParseError) {
resetServerState(server, err, { clearPool: true });
return;
}
resetServerState(server, err);
};
}
function executeWriteOperation(args, options, callback) {

@@ -590,3 +913,3 @@ if (typeof options === 'function') (callback = options), (options = {});

!args.retrying &&
options.retryWrites &&
!!options.retryWrites &&
options.session &&

@@ -596,3 +919,3 @@ isRetryableWritesSupported(topology) &&

topology.selectServer(writableServerSelector(), (err, server) => {
topology.selectServer(writableServerSelector(), options, (err, server) => {
if (err) {

@@ -605,3 +928,3 @@ callback(err, null);

if (!err) return callback(null, result);
if (!(err instanceof MongoNetworkError) && !err.message.match(/not master/)) {
if (!isRetryableError(err)) {
return callback(err);

@@ -630,8 +953,52 @@ }

server[op](ns, ops, options, handler);
});
}
// we need to increment the statement id if we're in a transaction
if (options.session && options.session.inTransaction()) {
options.session.incrementStatementId(ops.length);
/**
* Resets the internal state of this server to `Unknown` by simulating an empty ismaster
*
* @private
* @param {Server} server
* @param {MongoError} error The error that caused the state reset
* @param {object} [options] Optional settings
* @param {boolean} [options.clearPool=false] Pool should be cleared out on state reset
*/
function resetServerState(server, error, options) {
options = Object.assign({}, { clearPool: false }, options);
function resetState() {
server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, null, { error })
);
}
if (options.clearPool && server.pool) {
server.pool.reset(() => resetState());
return;
}
resetState();
}
function translateReadPreference(options) {
if (options.readPreference == null) {
return;
}
let r = options.readPreference;
if (typeof r === 'string') {
options.readPreference = new ReadPreference(r);
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
});
}
});
} else if (!(r instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + r);
}
return options;
}

@@ -702,2 +1069,23 @@

/**
* An event emitted indicating a command was started, if command monitoring is enabled
*
* @event Topology#commandStarted
* @type {object}
*/
/**
* An event emitted indicating a command succeeded, if command monitoring is enabled
*
* @event Topology#commandSucceeded
* @type {object}
*/
/**
* An event emitted indicating a command failed, if command monitoring is enabled
*
* @event Topology#commandFailed
* @type {object}
*/
module.exports = Topology;

@@ -14,2 +14,3 @@ 'use strict';

const TxnState = require('./transactions').TxnState;
const isPromiseLike = require('./utils').isPromiseLike;

@@ -247,4 +248,119 @@ function assertAlive(session, callback) {

}
/**
* A user provided function to be run within a transaction
*
* @callback WithTransactionCallback
* @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
* @returns {Promise} The resulting Promise of operations run within this transaction
*/
/**
* Runs a provided lambda within a transaction, retrying either the commit operation
* or entire transaction as needed (and when the error permits) to better ensure that
* the transaction can complete successfully.
*
* IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
* return a Promise will result in undefined behavior.
*
* @param {WithTransactionCallback} fn
* @param {TransactionOptions} [options] Optional settings for the transaction
*/
withTransaction(fn, options) {
const startTime = Date.now();
return attemptTransaction(this, startTime, fn, options);
}
}
const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
'CannotSatisfyWriteConcern',
'UnknownReplWriteConcern',
'UnsatisfiableWriteConcern'
]);
function hasNotTimedOut(startTime, max) {
return Date.now() - startTime < max;
}
function isUnknownTransactionCommitResult(err) {
return (
!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE
);
}
function attemptTransactionCommit(session, startTime, fn, options) {
return session.commitTransaction().catch(err => {
if (err instanceof MongoError && hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)) {
if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
return attemptTransactionCommit(session, startTime, fn, options);
}
if (err.hasErrorLabel('TransientTransactionError')) {
return attemptTransaction(session, startTime, fn, options);
}
}
throw err;
});
}
const USER_EXPLICIT_TXN_END_STATES = new Set([
TxnState.NO_TRANSACTION,
TxnState.TRANSACTION_COMMITTED,
TxnState.TRANSACTION_ABORTED
]);
function userExplicitlyEndedTransaction(session) {
return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
}
function attemptTransaction(session, startTime, fn, options) {
session.startTransaction(options);
let promise;
try {
promise = fn(session);
} catch (err) {
promise = Promise.reject(err);
}
if (!isPromiseLike(promise)) {
session.abortTransaction();
throw new TypeError('Function provided to `withTransaction` must return a Promise');
}
return promise
.then(() => {
if (userExplicitlyEndedTransaction(session)) {
return;
}
return attemptTransactionCommit(session, startTime, fn, options);
})
.catch(err => {
function maybeRetryOrThrow(err) {
if (
err instanceof MongoError &&
err.hasErrorLabel('TransientTransactionError') &&
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
) {
return attemptTransaction(session, startTime, fn, options);
}
throw err;
}
if (session.transaction.isActive) {
return session.abortTransaction().then(() => maybeRetryOrThrow(err));
}
return maybeRetryOrThrow(err);
});
}
function endTransaction(session, commandName, callback) {

@@ -305,8 +421,17 @@ if (!assertAlive(session, callback)) {

// apply a writeConcern if specified
let writeConcern;
if (session.transaction.options.writeConcern) {
Object.assign(command, { writeConcern: session.transaction.options.writeConcern });
writeConcern = Object.assign({}, session.transaction.options.writeConcern);
} else if (session.clientOptions && session.clientOptions.w) {
Object.assign(command, { writeConcern: { w: session.clientOptions.w } });
writeConcern = { w: session.clientOptions.w };
}
if (txnState === TxnState.TRANSACTION_COMMITTED) {
writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
}
if (writeConcern) {
Object.assign(command, { writeConcern });
}
function commandHandler(e, r) {

@@ -331,3 +456,8 @@ if (commandName === 'commitTransaction') {

e.errorLabels.push('UnknownTransactionCommitResult');
if (isUnknownTransactionCommitResult(e)) {
e.errorLabels.push('UnknownTransactionCommitResult');
// per txns spec, must unpin session in this case
session.transaction.unpinServer();
}
}

@@ -346,5 +476,19 @@ } else {

if (session.transaction.recoveryToken) {
command.recoveryToken = session.transaction.recoveryToken;
}
// send the command
session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
if (err && isRetryableError(err)) {
// SPEC-1185: apply majority write concern when retrying commitTransaction
if (command.commitTransaction) {
// per txns spec, must unpin session in this case
session.transaction.unpinServer();
command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
w: 'majority'
});
}
return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>

@@ -351,0 +495,0 @@ commandHandler(transactionError(_err), _reply)

@@ -40,4 +40,2 @@ 'use strict';

const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders;
//

@@ -95,3 +93,2 @@ // States

* @param {number} [options.socketTimeout=0] TCP Socket timeout setting
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
* @param {boolean} [options.ssl=false] Use SSL for connection

@@ -175,5 +172,3 @@ * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.

// Client info
clientInfo: createClientInfo(options),
// Authentication context
authenticationContexts: []
clientInfo: createClientInfo(options)
};

@@ -200,5 +195,2 @@

// All the authProviders
this.authProviders = options.authProviders || defaultAuthProviders(this.s.bson);
// Disconnected state

@@ -213,4 +205,2 @@ this.state = DISCONNECTED;

this.disconnectedProxies = [];
// Are we authenticating
this.authenticating = false;
// Index of proxy to run operations against

@@ -280,4 +270,2 @@ this.index = 0;

* Initiate server connect
* @method
* @param {array} [options.auth=null] Array of auth options to apply on connect
*/

@@ -296,3 +284,2 @@ Mongos.prototype.connect = function(options) {

Object.assign({}, self.s.options, x, options, {
authProviders: self.authProviders,
reconnect: false,

@@ -316,2 +303,12 @@ monitoring: false,

/**
* Authenticate the topology.
* @method
* @param {MongoCredentials} credentials The credentials for authentication we are using
* @param {authResultCallback} callback A callback function
*/
Mongos.prototype.auth = function(credentials, callback) {
callback(null, null);
};
function handleEvent(self) {

@@ -349,56 +346,53 @@ return function() {

if (event === 'connect') {
// Do we have authentication contexts that need to be applied
applyAuthenticationContexts(self, _this, function() {
// Get last known ismaster
self.ismaster = _this.lastIsMaster();
// Get last known ismaster
self.ismaster = _this.lastIsMaster();
// Is this not a proxy, remove t
if (self.ismaster.msg === 'isdbgrid') {
// Add to the connectd list
for (var i = 0; i < self.connectedProxies.length; i++) {
if (self.connectedProxies[i].name === _this.name) {
// Move from connectingProxies
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _this);
// Emit the initial topology
emitTopologyDescriptionChanged(self);
_this.destroy();
return self.emit('failed', _this);
}
// Is this not a proxy, remove t
if (self.ismaster.msg === 'isdbgrid') {
// Add to the connectd list
for (let i = 0; i < self.connectedProxies.length; i++) {
if (self.connectedProxies[i].name === _this.name) {
// Move from connectingProxies
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _this);
// Emit the initial topology
emitTopologyDescriptionChanged(self);
_this.destroy();
return self.emit('failed', _this);
}
}
// Remove the handlers
for (i = 0; i < handlers.length; i++) {
_this.removeAllListeners(handlers[i]);
}
// Remove the handlers
for (let i = 0; i < handlers.length; i++) {
_this.removeAllListeners(handlers[i]);
}
// Add stable state handlers
_this.on('error', handleEvent(self, 'error'));
_this.on('close', handleEvent(self, 'close'));
_this.on('timeout', handleEvent(self, 'timeout'));
_this.on('parseError', handleEvent(self, 'parseError'));
// Add stable state handlers
_this.on('error', handleEvent(self, 'error'));
_this.on('close', handleEvent(self, 'close'));
_this.on('timeout', handleEvent(self, 'timeout'));
_this.on('parseError', handleEvent(self, 'parseError'));
// Move from connecting proxies connected
moveServerFrom(self.connectingProxies, self.connectedProxies, _this);
// Emit the joined event
self.emit('joined', 'mongos', _this);
} else {
// Print warning if we did not find a mongos proxy
if (self.s.logger.isWarn()) {
var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
// We have a standalone server
if (!self.ismaster.hosts) {
message = 'expected mongos proxy, but found standalone mongod for server %s';
}
self.s.logger.warn(f(message, _this.name));
// Move from connecting proxies connected
moveServerFrom(self.connectingProxies, self.connectedProxies, _this);
// Emit the joined event
self.emit('joined', 'mongos', _this);
} else {
// Print warning if we did not find a mongos proxy
if (self.s.logger.isWarn()) {
var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
// We have a standalone server
if (!self.ismaster.hosts) {
message = 'expected mongos proxy, but found standalone mongod for server %s';
}
// This is not a mongos proxy, remove it completely
removeProxyFrom(self.connectingProxies, _this);
// Emit the left event
self.emit('left', 'server', _this);
// Emit failed event
self.emit('failed', _this);
self.s.logger.warn(f(message, _this.name));
}
});
// This is not a mongos proxy, remove it completely
removeProxyFrom(self.connectingProxies, _this);
// Emit the left event
self.emit('left', 'server', _this);
// Emit failed event
self.emit('failed', _this);
}
} else {

@@ -482,3 +476,14 @@ moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);

function pickProxy(self) {
function pickProxy(self, session) {
// TODO: Destructure :)
const transaction = session && session.transaction;
if (transaction && transaction.server) {
if (transaction.server.isConnected()) {
return transaction.server;
} else {
transaction.unpinServer();
}
}
// Get the currently connected Proxies

@@ -507,11 +512,18 @@ var connectedProxies = self.connectedProxies.slice(0);

let proxy;
// We have no connectedProxies pick first of the connected ones
if (connectedProxies.length === 0) {
return self.connectedProxies[0];
proxy = self.connectedProxies[0];
} else {
// Get proxy
proxy = connectedProxies[self.index % connectedProxies.length];
// Update the index
self.index = (self.index + 1) % connectedProxies.length;
}
// Get proxy
var proxy = connectedProxies[self.index % connectedProxies.length];
// Update the index
self.index = (self.index + 1) % connectedProxies.length;
if (transaction && transaction.isActive && proxy && proxy.isConnected()) {
transaction.pinServer(proxy);
}
// Return the proxy

@@ -561,29 +573,26 @@ return proxy;

if (event === 'connect' && !self.authenticating) {
// Do we have authentication contexts that need to be applied
applyAuthenticationContexts(self, _self, function() {
// Destroyed
if (self.state === DESTROYED || self.state === UNREFERENCED) {
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
return _self.destroy();
}
if (event === 'connect') {
// Destroyed
if (self.state === DESTROYED || self.state === UNREFERENCED) {
moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
return _self.destroy();
}
// Remove the handlers
for (var i = 0; i < handlers.length; i++) {
_self.removeAllListeners(handlers[i]);
}
// Remove the handlers
for (var i = 0; i < handlers.length; i++) {
_self.removeAllListeners(handlers[i]);
}
// Add stable state handlers
_self.on('error', handleEvent(self, 'error'));
_self.on('close', handleEvent(self, 'close'));
_self.on('timeout', handleEvent(self, 'timeout'));
_self.on('parseError', handleEvent(self, 'parseError'));
// Add stable state handlers
_self.on('error', handleEvent(self, 'error'));
_self.on('close', handleEvent(self, 'close'));
_self.on('timeout', handleEvent(self, 'timeout'));
_self.on('parseError', handleEvent(self, 'parseError'));
// Move to the connected servers
moveServerFrom(self.connectingProxies, self.connectedProxies, _self);
// Emit topology Change
emitTopologyDescriptionChanged(self);
// Emit joined event
self.emit('joined', 'mongos', _self);
});
// Move to the connected servers
moveServerFrom(self.connectingProxies, self.connectedProxies, _self);
// Emit topology Change
emitTopologyDescriptionChanged(self);
// Emit joined event
self.emit('joined', 'mongos', _self);
} else {

@@ -620,3 +629,2 @@ // Move from connectingProxies

port: parseInt(_server.name.split(':')[1], 10),
authProviders: self.authProviders,
reconnect: false,

@@ -663,31 +671,2 @@ monitoring: false,

function applyAuthenticationContexts(self, server, callback) {
if (self.s.authenticationContexts.length === 0) {
return callback();
}
// Copy contexts to ensure no modificiation in the middle of
// auth process.
var authContexts = self.s.authenticationContexts.slice(0);
// Apply one of the contexts
function applyAuth(authContexts, server, callback) {
if (authContexts.length === 0) return callback();
// Get the first auth context
var authContext = authContexts.shift();
// Copy the params
var customAuthContext = authContext.slice(0);
// Push our callback handler
customAuthContext.push(function(/* err */) {
applyAuth(authContexts, server, callback);
});
// Attempt authentication
server.auth.apply(server, customAuthContext);
}
// Apply all auth contexts
applyAuth(authContexts, server, callback);
}
function topologyMonitor(self, options) {

@@ -854,4 +833,2 @@ options = options || {};

if (this.haTimeoutId) clearTimeout(this.haTimeoutId);
// Clear out authentication contexts
this.s.authenticationContexts = [];

@@ -899,24 +876,27 @@ // Destroy all connecting servers

// Execute write operation
var executeWriteOperation = function(self, op, ns, ops, options, callback) {
function executeWriteOperation(args, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
// TODO: once we drop Node 4, use destructuring either here or in arguments.
const self = args.self;
const op = args.op;
const ns = args.ns;
const ops = args.ops;
// Pick a server
let server = pickProxy(self);
let server = pickProxy(self, options.session);
// No server found error out
if (!server) return callback(new MongoError('no mongos proxy available'));
if (!options.retryWrites || !options.session || !isRetryableWritesSupported(self)) {
// Execute the command
return server[op](ns, ops, options, callback);
}
const willRetryWrite =
!args.retrying &&
!!options.retryWrites &&
options.session &&
isRetryableWritesSupported(self) &&
!options.session.inTransaction();
// increment and assign txnNumber
options.willRetryWrite = true;
options.session.incrementTransactionNumber();
server[op](ns, ops, options, (err, result) => {
const handler = (err, result) => {
if (!err) return callback(null, result);
if (!isRetryableError(err)) {
if (!isRetryableError(err) || !willRetryWrite) {
return callback(err);

@@ -926,14 +906,27 @@ }

// Pick another server
server = pickProxy(self);
server = pickProxy(self, options.session);
// No server found error out with original error
if (!server || !isRetryableWritesSupported(server)) {
if (!server) {
return callback(err);
}
// rerun the operation
server[op](ns, ops, options, callback);
});
};
const newArgs = Object.assign({}, args, { retrying: true });
return executeWriteOperation(newArgs, options, callback);
};
if (callback.operationId) {
handler.operationId = callback.operationId;
}
// increment and assign txnNumber
if (willRetryWrite) {
options.session.incrementTransactionNumber();
options.willRetryWrite = willRetryWrite;
}
// rerun the operation
server[op](ns, ops, options, handler);
}
/**

@@ -970,3 +963,3 @@ * Insert one or more documents

// Execute write operation
executeWriteOperation(this, 'insert', ns, ops, options, callback);
executeWriteOperation({ self: this, op: 'insert', ns, ops }, options, callback);
};

@@ -1005,3 +998,3 @@

// Execute write operation
executeWriteOperation(this, 'update', ns, ops, options, callback);
executeWriteOperation({ self: this, op: 'update', ns, ops }, options, callback);
};

@@ -1040,3 +1033,3 @@

// Execute write operation
executeWriteOperation(this, 'remove', ns, ops, options, callback);
executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback);
};

@@ -1071,3 +1064,3 @@

// Pick a proxy
var server = pickProxy(self);
var server = pickProxy(self, options.session);

@@ -1148,160 +1141,2 @@ // Topology is not connected, save the call in the provided store to be

/**
* Authenticate using a specified mechanism
* @method
* @param {string} mechanism The Auth mechanism we are invoking
* @param {string} db The db we are invoking the mechanism against
* @param {...object} param Parameters for the specific mechanism
* @param {authResultCallback} callback A callback function
*/
Mongos.prototype.auth = function(mechanism, db) {
var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
var self = this;
var args = Array.prototype.slice.call(arguments, 2);
var callback = args.pop();
var currentContextIndex = 0;
// If we don't have the mechanism fail
if (this.authProviders[mechanism] == null && mechanism !== 'default') {
return callback(new MongoError(f('auth provider %s does not exist', mechanism)));
}
// Are we already authenticating, throw
if (this.authenticating) {
return callback(new MongoError('authentication or logout allready in process'));
}
// Topology is not connected, save the call in the provided store to be
// Executed at some point when the handler deems it's reconnected
if (!self.isConnected() && self.s.disconnectHandler != null) {
return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
}
// Set to authenticating
this.authenticating = true;
// All errors
var errors = [];
// Get all the servers
var servers = this.connectedProxies.slice(0);
// No servers return
if (servers.length === 0) {
this.authenticating = false;
callback(null, true);
}
// Authenticate
function auth(server) {
// Arguments without a callback
var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
// Create arguments
var finalArguments = argsWithoutCallback.concat([
function(err) {
count = count - 1;
// Save all the errors
if (err) errors.push({ name: server.name, err: err });
// We are done
if (count === 0) {
// Auth is done
self.authenticating = false;
// Return the auth error
if (errors.length) {
// Remove the entry from the stored authentication contexts
self.s.authenticationContexts.splice(currentContextIndex, 0);
// Return error
return callback(
new MongoError({
message: 'authentication fail',
errors: errors
}),
false
);
}
// Successfully authenticated session
callback(null, self);
}
}
]);
// Execute the auth only against non arbiter servers
if (!server.lastIsMaster().arbiterOnly) {
server.auth.apply(server, finalArguments);
}
}
// Save current context index
currentContextIndex = this.s.authenticationContexts.length;
// Store the auth context and return the last index
this.s.authenticationContexts.push([mechanism, db].concat(args.slice(0)));
// Get total count
var count = servers.length;
// Authenticate against all servers
while (servers.length > 0) {
auth(servers.shift());
}
};
/**
* Logout from a database
* @method
* @param {string} db The db we are logging out from
* @param {authResultCallback} callback A callback function
*/
Mongos.prototype.logout = function(dbName, callback) {
var self = this;
// Are we authenticating or logging out, throw
if (this.authenticating) {
throw new MongoError('authentication or logout allready in process');
}
// Ensure no new members are processed while logging out
this.authenticating = true;
// Remove from all auth providers (avoid any reaplication of the auth details)
var providers = Object.keys(this.authProviders);
for (var i = 0; i < providers.length; i++) {
this.authProviders[providers[i]].logout(dbName);
}
// Now logout all the servers
var servers = this.connectedProxies.slice(0);
var count = servers.length;
if (count === 0) return callback();
var errors = [];
function logoutServer(_server, cb) {
_server.logout(dbName, function(err) {
if (err) errors.push({ name: _server.name, err: err });
cb();
});
}
// Execute logout on all server instances
for (i = 0; i < servers.length; i++) {
logoutServer(servers[i], function() {
count = count - 1;
if (count === 0) {
// Do not block new operations
self.authenticating = false;
// If we have one or more errors
if (errors.length)
return callback(
new MongoError({
message: f('logout failed against db %s', dbName),
errors: errors
}),
false
);
// No errors
callback();
}
});
}
};
/**
* Selects a server

@@ -1311,3 +1146,4 @@ *

* @param {function} selector Unused
* @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
* @param {ReadPreference} [options.readPreference] Unused
* @param {ClientSession} [options.session] Specify a session if it is being used
* @param {function} callback

@@ -1322,3 +1158,3 @@ */

const server = pickProxy(this);
const server = pickProxy(this, options.session);
if (this.s.debug) this.emit('pickedServer', null, server);

@@ -1325,0 +1161,0 @@ callback(null, server);

@@ -290,16 +290,14 @@ 'use strict';

if (ismaster && ismaster.msg === 'isdbgrid') {
if (this.primary && this.primary.name === serverName) {
this.primary = null;
this.topologyType = TopologyType.ReplicaSetNoPrimary;
}
return false;
}
// A RSOther instance
if (
(ismaster.setName && ismaster.hidden) ||
(ismaster.setName &&
!ismaster.ismaster &&
!ismaster.secondary &&
!ismaster.arbiterOnly &&
!ismaster.passive)
) {
// A RSGhost instance
if (ismaster.isreplicaset) {
self.set[serverName] = {
type: ServerType.RSOther,
type: ServerType.RSGhost,
setVersion: null,

@@ -309,2 +307,7 @@ electionId: null,

};
if (this.primary && this.primary.name === serverName) {
this.primary = null;
}
// Set the topology

@@ -315,12 +318,21 @@ this.topologyType = this.primary

if (ismaster.setName) this.setName = ismaster.setName;
// Set the topology
return false;
}
// A RSGhost instance
if (ismaster.isreplicaset) {
// A RSOther instance
if (
(ismaster.setName && ismaster.hidden) ||
(ismaster.setName &&
!ismaster.ismaster &&
!ismaster.secondary &&
!ismaster.arbiterOnly &&
!ismaster.passive)
) {
self.set[serverName] = {
type: ServerType.RSGhost,
type: ServerType.RSOther,
setVersion: null,
electionId: null,
setName: null
setName: ismaster.setName
};

@@ -333,4 +345,2 @@

if (ismaster.setName) this.setName = ismaster.setName;
// Set the topology
return false;

@@ -337,0 +347,0 @@ }

@@ -21,7 +21,4 @@ 'use strict';

const isRetryableError = require('../error').isRetryableError;
const BSON = retrieveBSON();
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders;
var BSON = retrieveBSON();
//

@@ -81,3 +78,2 @@ // States

* @param {number} [options.socketTimeout=0] TCP Socket timeout setting
* @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
* @param {boolean} [options.ssl=false] Use SSL for connection

@@ -196,5 +192,3 @@ * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.

// Client info
clientInfo: createClientInfo(options),
// Authentication context
authenticationContexts: []
clientInfo: createClientInfo(options)
};

@@ -223,5 +217,2 @@

// All the authProviders
this.authProviders = options.authProviders || defaultAuthProviders(this.s.bson);
// Add forwarding of events from state handler

@@ -245,4 +236,2 @@ var types = ['joined', 'left'];

this.haTimeoutId = null;
// Are we authenticating
this.authenticating = false;
// Last ismaster

@@ -309,3 +298,3 @@ this.ismaster = null;

if (event === 'connect' && !self.authenticating) {
if (event === 'connect') {
// Destroyed

@@ -316,40 +305,30 @@ if (self.state === DESTROYED || self.state === UNREFERENCED) {

// Do we have authentication contexts that need to be applied
applyAuthenticationContexts(self, _self, function() {
// Destroy the instance
if (self.state === DESTROYED || self.state === UNREFERENCED) {
return _self.destroy({ force: true });
// Update the state
var result = self.s.replicaSetState.update(_self);
// Update the state with the new server
if (result) {
// Primary lastIsMaster store it
if (_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
self.ismaster = _self.lastIsMaster();
}
// Update the state
var result = self.s.replicaSetState.update(_self);
// Update the state with the new server
if (result) {
// Primary lastIsMaster store it
if (_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
self.ismaster = _self.lastIsMaster();
}
// Remove the handlers
for (let i = 0; i < handlers.length; i++) {
_self.removeAllListeners(handlers[i]);
}
// Remove the handlers
for (var i = 0; i < handlers.length; i++) {
_self.removeAllListeners(handlers[i]);
}
// Add stable state handlers
_self.on('error', handleEvent(self, 'error'));
_self.on('close', handleEvent(self, 'close'));
_self.on('timeout', handleEvent(self, 'timeout'));
_self.on('parseError', handleEvent(self, 'parseError'));
// Add stable state handlers
_self.on('error', handleEvent(self, 'error'));
_self.on('close', handleEvent(self, 'close'));
_self.on('timeout', handleEvent(self, 'timeout'));
_self.on('parseError', handleEvent(self, 'parseError'));
// Enalbe the monitoring of the new server
monitorServer(_self.lastIsMaster().me, self, {});
// Enalbe the monitoring of the new server
monitorServer(_self.lastIsMaster().me, self, {});
// Rexecute any stalled operation
rexecuteOperations(self);
} else {
_self.destroy({ force: true });
}
});
} else if (event === 'connect' && self.authenticating) {
this.destroy({ force: true });
// Rexecute any stalled operation
rexecuteOperations(self);
} else {
_self.destroy({ force: true });
}
} else if (event === 'error') {

@@ -385,3 +364,2 @@ error = err;

port: parseInt(_server.split(':')[1], 10),
authProviders: self.authProviders,
reconnect: false,

@@ -737,36 +715,2 @@ monitoring: false,

function applyAuthenticationContexts(self, server, callback) {
if (self.s.authenticationContexts.length === 0) {
return callback();
}
// Do not apply any auth contexts if it's an arbiter
if (server.lastIsMaster() && server.lastIsMaster().arbiterOnly) {
return callback();
}
// Copy contexts to ensure no modificiation in the middle of
// auth process.
var authContexts = self.s.authenticationContexts.slice(0);
// Apply one of the contexts
function applyAuth(authContexts, server, callback) {
if (authContexts.length === 0) return callback();
// Get the first auth context
var authContext = authContexts.shift();
// Copy the params
var customAuthContext = authContext.slice(0);
// Push our callback handler
customAuthContext.push(function(/* err */) {
applyAuth(authContexts, server, callback);
});
// Attempt authentication
server.auth.apply(server, customAuthContext);
}
// Apply all auth contexts
applyAuth(authContexts, server, callback);
}
function shouldTriggerConnect(self) {

@@ -810,63 +754,55 @@ const isConnecting = self.state === CONNECTING;

if (event === 'connect') {
// Do we have authentication contexts that need to be applied
applyAuthenticationContexts(self, _this, function() {
// Destroy the instance
if (self.state === DESTROYED || self.state === UNREFERENCED) {
return _this.destroy({ force: true });
// Update the state
var result = self.s.replicaSetState.update(_this);
if (result === true) {
// Primary lastIsMaster store it
if (_this.lastIsMaster() && _this.lastIsMaster().ismaster) {
self.ismaster = _this.lastIsMaster();
}
// Update the state
var result = self.s.replicaSetState.update(_this);
if (result === true) {
// Primary lastIsMaster store it
if (_this.lastIsMaster() && _this.lastIsMaster().ismaster) {
self.ismaster = _this.lastIsMaster();
}
// Debug log
if (self.s.logger.isDebug()) {
self.s.logger.debug(
f(
'handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]',
event,
_this.name,
self.id,
JSON.stringify(self.s.replicaSetState.set)
)
);
}
// Debug log
if (self.s.logger.isDebug()) {
self.s.logger.debug(
f(
'handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]',
event,
_this.name,
self.id,
JSON.stringify(self.s.replicaSetState.set)
)
);
}
// Remove the handlers
for (let i = 0; i < handlers.length; i++) {
_this.removeAllListeners(handlers[i]);
}
// Remove the handlers
for (var i = 0; i < handlers.length; i++) {
_this.removeAllListeners(handlers[i]);
}
// Add stable state handlers
_this.on('error', handleEvent(self, 'error'));
_this.on('close', handleEvent(self, 'close'));
_this.on('timeout', handleEvent(self, 'timeout'));
_this.on('parseError', handleEvent(self, 'parseError'));
// Add stable state handlers
_this.on('error', handleEvent(self, 'error'));
_this.on('close', handleEvent(self, 'close'));
_this.on('timeout', handleEvent(self, 'timeout'));
_this.on('parseError', handleEvent(self, 'parseError'));
// Do we have a primary or primaryAndSecondary
if (shouldTriggerConnect(self)) {
// We are connected
self.state = CONNECTED;
// Do we have a primary or primaryAndSecondary
if (shouldTriggerConnect(self)) {
// We are connected
self.state = CONNECTED;
// Set initial connect state
self.initialConnectState.connect = true;
// Emit connect event
process.nextTick(function() {
self.emit('connect', self);
});
// Set initial connect state
self.initialConnectState.connect = true;
// Emit connect event
process.nextTick(function() {
self.emit('connect', self);
});
topologyMonitor(self, {});
}
} else if (result instanceof MongoError) {
_this.destroy({ force: true });
self.destroy({ force: true });
return self.emit('error', result);
} else {
_this.destroy({ force: true });
topologyMonitor(self, {});
}
});
} else if (result instanceof MongoError) {
_this.destroy({ force: true });
self.destroy({ force: true });
return self.emit('error', result);
} else {
_this.destroy({ force: true });
}
} else {

@@ -966,4 +902,2 @@ // Emit failure to connect

* Initiate server connect
* @method
* @param {array} [options.auth=null] Array of auth options to apply on connect
*/

@@ -982,3 +916,2 @@ ReplSet.prototype.connect = function(options) {

Object.assign({}, self.s.options, x, options, {
authProviders: self.authProviders,
reconnect: false,

@@ -1016,2 +949,12 @@ monitoring: false,

/**
* Authenticate the topology.
* @method
* @param {MongoCredentials} credentials The credentials for authentication we are using
* @param {authResultCallback} callback A callback function
*/
ReplSet.prototype.auth = function(credentials, callback) {
callback(null, null);
};
/**
* Destroy the server connection

@@ -1029,4 +972,2 @@ * @param {boolean} [options.force=false] Force destroy the pool

this.s.replicaSetState.destroy(options);
// Clear out authentication contexts
this.s.authenticationContexts = [];

@@ -1110,6 +1051,2 @@ // Destroy all connecting servers

// If we are authenticating signal not connected
// To avoid interleaving of operations
if (this.authenticating) return false;
// If we specified a read preference check if we are connected to something

@@ -1155,2 +1092,3 @@ // than can satisfy this

* @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
* @param {ClientSession} [options.session] Unused
* @param {function} callback

@@ -1403,177 +1341,2 @@ */

/**
* Authenticate using a specified mechanism
* @method
* @param {string} mechanism The Auth mechanism we are invoking
* @param {string} db The db we are invoking the mechanism against
* @param {...object} param Parameters for the specific mechanism
* @param {authResultCallback} callback A callback function
*/
ReplSet.prototype.auth = function(mechanism, db) {
var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
var self = this;
var args = Array.prototype.slice.call(arguments, 2);
var callback = args.pop();
var currentContextIndex = 0;
// If we don't have the mechanism fail
if (this.authProviders[mechanism] == null && mechanism !== 'default') {
return callback(new MongoError(f('auth provider %s does not exist', mechanism)));
}
// Are we already authenticating, throw
if (this.authenticating) {
return callback(new MongoError('authentication or logout allready in process'));
}
// Topology is not connected, save the call in the provided store to be
// Executed at some point when the handler deems it's reconnected
if (!this.isConnected() && self.s.disconnectHandler != null) {
if (!self.s.replicaSetState.hasPrimary() && !self.s.options.secondaryOnlyConnectionAllowed) {
return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
} else if (
!self.s.replicaSetState.hasSecondary() &&
self.s.options.secondaryOnlyConnectionAllowed
) {
return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
}
}
// Set to authenticating
this.authenticating = true;
// All errors
var errors = [];
// Get all the servers
var servers = this.s.replicaSetState.allServers();
// No servers return
if (servers.length === 0) {
this.authenticating = false;
callback(null, true);
}
// Authenticate
function auth(server) {
// Arguments without a callback
var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
// Create arguments
var finalArguments = argsWithoutCallback.concat([
function(err) {
count = count - 1;
// Save all the errors
if (err) errors.push({ name: server.name, err: err });
// We are done
if (count === 0) {
// Auth is done
self.authenticating = false;
// Return the auth error
if (errors.length) {
// Remove the entry from the stored authentication contexts
self.s.authenticationContexts.splice(currentContextIndex, 0);
// Return error
return callback(
new MongoError({
message: 'authentication fail',
errors: errors
}),
false
);
}
// Successfully authenticated session
callback(null, self);
}
}
]);
if (!server.lastIsMaster().arbiterOnly) {
// Execute the auth only against non arbiter servers
server.auth.apply(server, finalArguments);
} else {
// If we are authenticating against an arbiter just ignore it
finalArguments.pop()(null);
}
}
// Get total count
var count = servers.length;
// Save current context index
currentContextIndex = this.s.authenticationContexts.length;
// Store the auth context and return the last index
this.s.authenticationContexts.push([mechanism, db].concat(args.slice(0)));
// Authenticate against all servers
while (servers.length > 0) {
auth(servers.shift());
}
};
/**
* Logout from a database
* @method
* @param {string} db The db we are logging out from
* @param {authResultCallback} callback A callback function
*/
ReplSet.prototype.logout = function(dbName, callback) {
var self = this;
// Are we authenticating or logging out, throw
if (this.authenticating) {
throw new MongoError('authentication or logout allready in process');
}
// Ensure no new members are processed while logging out
this.authenticating = true;
// Remove from all auth providers (avoid any reaplication of the auth details)
var providers = Object.keys(this.authProviders);
for (var i = 0; i < providers.length; i++) {
this.authProviders[providers[i]].logout(dbName);
}
// Clear out any contexts associated with the db
self.s.authenticationContexts = self.s.authenticationContexts.filter(function(context) {
return context[1] !== dbName;
});
// Now logout all the servers
var servers = this.s.replicaSetState.allServers();
var count = servers.length;
if (count === 0) return callback();
var errors = [];
function logoutServer(_server, cb) {
_server.logout(dbName, function(err) {
if (err) errors.push({ name: _server.name, err: err });
cb();
});
}
// Execute logout on all server instances
for (i = 0; i < servers.length; i++) {
logoutServer(servers[i], function() {
count = count - 1;
if (count === 0) {
// Do not block new operations
self.authenticating = false;
// If we have one or more errors
if (errors.length)
return callback(
new MongoError({
message: f('logout failed against db %s', dbName),
errors: errors
}),
false
);
// No errors
callback();
}
});
}
};
/**
* Get a new cursor

@@ -1580,0 +1343,0 @@ * @method

@@ -11,7 +11,5 @@ 'use strict';

Pool = require('../connection/pool'),
Query = require('../connection/commands').Query,
MongoError = require('../error').MongoError,
MongoNetworkError = require('../error').MongoNetworkError,
TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support'),
ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support'),
wireProtocol = require('../wireprotocol'),
BasicCursor = require('../cursor'),

@@ -27,53 +25,2 @@ sdam = require('./shared'),

function getSaslSupportedMechs(options) {
if (!options) {
return {};
}
const authArray = options.auth || [];
const authMechanism = authArray[0] || options.authMechanism;
const authSource = authArray[1] || options.authSource || options.dbName || 'admin';
const user = authArray[2] || options.user;
if (typeof authMechanism === 'string' && authMechanism.toUpperCase() !== 'DEFAULT') {
return {};
}
if (!user) {
return {};
}
return { saslSupportedMechs: `${authSource}.${user}` };
}
function getDefaultAuthMechanism(ismaster) {
if (ismaster) {
// If ismaster contains saslSupportedMechs, use scram-sha-256
// if it is available, else scram-sha-1
if (Array.isArray(ismaster.saslSupportedMechs)) {
return ismaster.saslSupportedMechs.indexOf('SCRAM-SHA-256') >= 0
? 'scram-sha-256'
: 'scram-sha-1';
}
// Fallback to legacy selection method. If wire version >= 3, use scram-sha-1
if (ismaster.maxWireVersion >= 3) {
return 'scram-sha-1';
}
}
// Default for wireprotocol < 3
return 'mongocr';
}
function extractIsMasterError(err, result) {
if (err) {
return err;
}
if (result && result.result && result.result.ok === 0) {
return new MongoError(result.result);
}
}
// Used for filtering out fields for loggin

@@ -95,3 +42,2 @@ var debugFields = [

'socketTimeout',
'singleBufferSerializtion',
'ssl',

@@ -231,5 +177,2 @@ 'ca',

this.initialConnect = true;
// Wire protocol handler, default to oldest known protocol handler
// this gets changed when the first ismaster is called.
this.wireProtocolHandler = new TwoSixWireProtocolSupport();
// Default type

@@ -310,16 +253,2 @@ this._type = 'server';

function isSupportedServer(response) {
return response && typeof response.maxWireVersion === 'number' && response.maxWireVersion >= 2;
}
function configureWireProtocolHandler(self, ismaster) {
// 3.2 wire protocol handler
if (ismaster.maxWireVersion >= 4) {
return new ThreeTwoWireProtocolSupport();
}
// default to 2.6 wire protocol handler
return new TwoSixWireProtocolSupport();
}
function disconnectHandler(self, type, ns, cmd, options, callback) {

@@ -352,6 +281,2 @@ // Topology is not connected, save the call in the provided store to be

// Perform ismaster call
// Query options
var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
// Create a query instance
var query = new Query(self.s.bson, 'admin.$cmd', { ismaster: true }, queryOptions);
// Get start time

@@ -361,4 +286,5 @@ var start = new Date().getTime();

// Execute the ismaster query
self.s.pool.write(
query,
self.command(
'admin.$cmd',
{ ismaster: true },
{

@@ -371,3 +297,3 @@ socketTimeout:

},
function(err, result) {
(err, result) => {
// Set initial lastIsMasterMS

@@ -388,3 +314,3 @@ self.lastIsMasterMS = new Date().getTime() - start;

var eventHandler = function(self, event) {
return function(err) {
return function(err, conn) {
// Log information of received information if in info mode

@@ -400,122 +326,63 @@ if (self.s.logger.isInfo()) {

if (event === 'connect') {
// Issue an ismaster command at connect
// Query options
var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
// Create a query instance
var compressors =
self.s.compression && self.s.compression.compressors ? self.s.compression.compressors : [];
var query = new Query(
self.s.bson,
'admin.$cmd',
Object.assign(
{ ismaster: true, client: self.clientInfo, compression: compressors },
getSaslSupportedMechs(self.s.options)
),
queryOptions
);
// Get start time
var start = new Date().getTime();
// Execute the ismaster query
self.s.pool.write(
query,
{
socketTimeout: self.s.options.connectionTimeout || 2000
},
function(err, result) {
// Set initial lastIsMasterMS
self.lastIsMasterMS = new Date().getTime() - start;
self.initialConnect = false;
self.ismaster = conn.ismaster;
self.lastIsMasterMS = conn.lastIsMasterMS;
if (conn.agreedCompressor) {
self.s.pool.options.agreedCompressor = conn.agreedCompressor;
}
const serverError = extractIsMasterError(err, result);
if (conn.zlibCompressionLevel) {
self.s.pool.options.zlibCompressionLevel = conn.zlibCompressionLevel;
}
if (serverError) {
self.destroy();
return self.emit('error', serverError);
}
if (conn.ismaster.$clusterTime) {
const $clusterTime = conn.ismaster.$clusterTime;
self.clusterTime = $clusterTime;
}
if (!isSupportedServer(result.result)) {
self.destroy();
const latestSupportedVersion = '2.6';
const message =
'Server at ' +
self.s.options.host +
':' +
self.s.options.port +
' reports wire version ' +
(result.result.maxWireVersion || 0) +
', but this version of Node.js Driver requires at least 2 (MongoDB' +
latestSupportedVersion +
').';
return self.emit('error', new MongoError(message), self);
}
// It's a proxy change the type so
// the wireprotocol will send $readPreference
if (self.ismaster.msg === 'isdbgrid') {
self._type = 'mongos';
}
// Determine whether the server is instructing us to use a compressor
if (result.result && result.result.compression) {
for (var i = 0; i < self.s.compression.compressors.length; i++) {
if (result.result.compression.indexOf(self.s.compression.compressors[i]) > -1) {
self.s.pool.options.agreedCompressor = self.s.compression.compressors[i];
break;
}
}
// Have we defined self monitoring
if (self.s.monitoring) {
self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval);
}
if (self.s.compression.zlibCompressionLevel) {
self.s.pool.options.zlibCompressionLevel = self.s.compression.zlibCompressionLevel;
// Emit server description changed if something listening
sdam.emitServerDescriptionChanged(self, {
address: self.name,
arbiters: [],
hosts: [],
passives: [],
type: sdam.getTopologyType(self)
});
if (!self.s.inTopology) {
// Emit topology description changed if something listening
sdam.emitTopologyDescriptionChanged(self, {
topologyType: 'Single',
servers: [
{
address: self.name,
arbiters: [],
hosts: [],
passives: [],
type: sdam.getTopologyType(self)
}
}
]
});
}
// Ensure no error emitted after initial connect when reconnecting
self.initialConnect = false;
// Save the ismaster
self.ismaster = result.result;
// Log the ismaster if available
if (self.s.logger.isInfo()) {
self.s.logger.info(
f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster))
);
}
// It's a proxy change the type so
// the wireprotocol will send $readPreference
if (self.ismaster.msg === 'isdbgrid') {
self._type = 'mongos';
}
// Add the correct wire protocol handler
self.wireProtocolHandler = configureWireProtocolHandler(self, self.ismaster);
// Have we defined self monitoring
if (self.s.monitoring) {
self.monitoringProcessId = setTimeout(
monitoringProcess(self),
self.s.monitoringInterval
);
}
// Emit server description changed if something listening
sdam.emitServerDescriptionChanged(self, {
address: self.name,
arbiters: [],
hosts: [],
passives: [],
type: sdam.getTopologyType(self)
});
if (!self.s.inTopology) {
// Emit topology description changed if something listening
sdam.emitTopologyDescriptionChanged(self, {
topologyType: 'Single',
servers: [
{
address: self.name,
arbiters: [],
hosts: [],
passives: [],
type: sdam.getTopologyType(self)
}
]
});
}
// Log the ismaster if available
if (self.s.logger.isInfo()) {
self.s.logger.info(
f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster))
);
}
// Emit connect
self.emit('connect', self);
}
);
// Emit connect
self.emit('connect', self);
} else if (

@@ -602,4 +469,2 @@ event === 'error' ||

* Initiate server connect
* @method
* @param {array} [options.auth=null] Array of auth options to apply on connect
*/

@@ -644,11 +509,16 @@ Server.prototype.connect = function(options) {

// Connect with optional auth settings
if (options.auth) {
self.s.pool.connect.apply(self.s.pool, options.auth);
} else {
self.s.pool.connect();
}
self.s.pool.connect();
};
/**
* Authenticate the topology.
* @method
* @param {MongoCredentials} credentials The credentials for authentication we are using
* @param {authResultCallback} callback A callback function
*/
Server.prototype.auth = function(credentials, callback) {
callback(null, null);
};
/**
* Get the server description

@@ -770,3 +640,3 @@ * @method

self.wireProtocolHandler.command(self, ns, cmd, options, callback);
wireProtocol.command(self, ns, cmd, options, callback);
};

@@ -802,3 +672,3 @@

// Execute write
return self.wireProtocolHandler.insert(self, ns, ops, options, callback);
return wireProtocol.insert(self, ns, ops, options, callback);
};

@@ -838,3 +708,3 @@

// Execute write
return self.wireProtocolHandler.update(self, ns, ops, options, callback);
return wireProtocol.update(self, ns, ops, options, callback);
};

@@ -874,3 +744,3 @@

// Execute write
return self.wireProtocolHandler.remove(self, ns, ops, options, callback);
return wireProtocol.remove(self, ns, ops, options, callback);
};

@@ -905,48 +775,2 @@

/**
* Logout from a database
* @method
* @param {string} db The db we are logging out from
* @param {authResultCallback} callback A callback function
*/
Server.prototype.logout = function(dbName, callback) {
this.s.pool.logout(dbName, callback);
};
/**
* Authenticate using a specified mechanism
* @method
* @param {string} mechanism The Auth mechanism we are invoking
* @param {string} db The db we are invoking the mechanism against
* @param {...object} param Parameters for the specific mechanism
* @param {authResultCallback} callback A callback function
*/
Server.prototype.auth = function(mechanism, db) {
var self = this;
if (mechanism === 'default') {
mechanism = getDefaultAuthMechanism(self.ismaster);
}
// Slice all the arguments off
var args = Array.prototype.slice.call(arguments, 0);
// Set the mechanism
args[0] = mechanism;
// Get the callback
var callback = args[args.length - 1];
// If we are not connected or have a disconnectHandler specified
if (disconnectHandler(self, 'auth', db, args, {}, callback)) {
return;
}
// Do not authenticate if we are an arbiter
if (this.lastIsMaster() && this.lastIsMaster().arbiterOnly) {
return callback(null, true);
}
// Apply the arguments to the pool
self.s.pool.auth.apply(self.s.pool, args);
};
/**
* Compare two server instances

@@ -974,2 +798,6 @@ * @method

* Selects a server
* @method
* @param {function} selector Unused
* @param {ReadPreference} [options.readPreference] Unused
* @param {ClientSession} [options.session] Unused
* @return {Server}

@@ -976,0 +804,0 @@ */

@@ -104,4 +104,16 @@ 'use strict';

if (options.readPreference) this.options.readPreference = options.readPreference;
// TODO: This isn't technically necessary
this._pinnedServer = undefined;
this._recoveryToken = undefined;
}
get server() {
return this._pinnedServer;
}
get recoveryToken() {
return this._recoveryToken;
}
/**

@@ -126,2 +138,5 @@ * @ignore

this.state = nextState;
if (this.state === TxnState.NO_TRANSACTION || this.state === TxnState.STARTING_TRANSACTION) {
this.unpinServer();
}
return;

@@ -134,4 +149,14 @@ }

}
pinServer(server) {
if (this.isActive) {
this._pinnedServer = server;
}
}
unpinServer() {
this._pinnedServer = undefined;
}
}
module.exports = { TxnState, Transaction };

@@ -78,2 +78,21 @@ 'use strict';

/**
* A helper function for determining `maxWireVersion` between legacy and new topology
* instances
*
* @private
* @param {(Topology|Server)} topologyOrServer
*/
function maxWireVersion(topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}
if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
}
return null;
}
/*

@@ -88,5 +107,15 @@ * Checks that collation is supported by server.

function collationNotSupported(server, cmd) {
return cmd && cmd.collation && server.ismaster && server.ismaster.maxWireVersion < 5;
return cmd && cmd.collation && maxWireVersion(server) < 5;
}
/**
* Checks if a given value is a Promise
*
* @param {*} maybePromise
* @return true if the provided value is a Promise
*/
function isPromiseLike(maybePromise) {
return maybePromise && typeof maybePromise.then === 'function';
}
module.exports = {

@@ -98,3 +127,5 @@ uuidV4,

retrieveEJSON,
retrieveKerberos
retrieveKerberos,
maxWireVersion,
isPromiseLike
};
'use strict';
var ReadPreference = require('../topologies/read_preference'),
MongoError = require('../error').MongoError;
const ReadPreference = require('../topologies/read_preference');
const MongoError = require('../error').MongoError;
const ServerType = require('../sdam/server_description').ServerType;
const TopologyDescription = require('../sdam/topology_description').TopologyDescription;
var MESSAGE_HEADER_SIZE = 16;
const MESSAGE_HEADER_SIZE = 16;
const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID

@@ -18,3 +21,4 @@ // OPCODE Numbers

OP_KILL_CURSORS: 2007,
OP_COMPRESSED: 2012
OP_COMPRESSED: 2012,
OP_MSG: 2013
};

@@ -76,6 +80,15 @@

function isMongos(server) {
if (server.type === 'mongos') return true;
if (server.parent && server.parent.type === 'mongos') return true;
// NOTE: handle unified topology
function isSharded(topologyOrServer) {
if (topologyOrServer.type === 'mongos') return true;
if (topologyOrServer.description && topologyOrServer.description.type === ServerType.Mongos) {
return true;
}
// NOTE: This is incredibly inefficient, and should be removed once command construction
// happens based on `Server` not `Topology`.
if (topologyOrServer.description && topologyOrServer.description instanceof TopologyDescription) {
const servers = Array.from(topologyOrServer.description.servers.values());
return servers.some(server => server.type === ServerType.Mongos);
}
return false;

@@ -97,8 +110,9 @@ }

MESSAGE_HEADER_SIZE,
COMPRESSION_DETAILS_SIZE,
opcodes,
parseHeader,
applyCommonQueryOptions,
isMongos,
isSharded,
databaseNamespace,
collectionNamespace
};
{
"name": "mongodb-core",
"version": "3.1.11",
"version": "3.2.0-beta1",
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications",

@@ -40,3 +40,3 @@ "main": "index.js",

"mongodb-extjson": "^2.1.2",
"mongodb-mock-server": "^1.0.0",
"mongodb-mock-server": "^1.0.1",
"mongodb-test-runner": "^1.1.18",

@@ -43,0 +43,0 @@ "prettier": "~1.12.0",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc