Comparing version 1.2.10 to 1.2.11
var EventEmitter = require('events').EventEmitter | ||
, inherits = require('util').inherits; | ||
/** | ||
* Internal class for callback storage | ||
* @ignore | ||
*/ | ||
var CallbackStore = function() { | ||
// Make class an event emitter | ||
EventEmitter.call(this); | ||
// Add a info about call variable | ||
this._notReplied = {}; | ||
} | ||
/** | ||
* @ignore | ||
*/ | ||
inherits(CallbackStore, EventEmitter); | ||
var Base = function Base() { | ||
EventEmitter.call(this); | ||
// Callback store is part of connection specification | ||
if(Base._callBackStore == null) { | ||
Base._callBackStore = new CallbackStore(); | ||
} | ||
// this._callBackStore = Base._callBackStore; | ||
this._callBackStore = new CallbackStore(); | ||
} | ||
@@ -18,29 +42,24 @@ | ||
Base.prototype.__executeAllCallbacksWithError = function(err) { | ||
// Locate all the possible callbacks that need to return | ||
for(var i = 0; i < this.dbInstances.length; i++) { | ||
// Fetch the db Instance | ||
var dbInstance = this.dbInstances[i]; | ||
// Check all callbacks | ||
var keys = Object.keys(dbInstance._callBackStore._notReplied); | ||
// For each key check if it's a callback that needs to be returned | ||
for(var j = 0; j < keys.length; j++) { | ||
var info = dbInstance._callBackStore._notReplied[keys[j]]; | ||
// Check if we have a chained command (findAndModify) | ||
if(info && info['chained'] && Array.isArray(info['chained']) && info['chained'].length > 0) { | ||
var chained = info['chained']; | ||
// Only callback once and the last one is the right one | ||
var finalCallback = chained.pop(); | ||
// Emit only the last event | ||
dbInstance._callBackStore.emit(finalCallback, err, null); | ||
// Check all callbacks | ||
var keys = Object.keys(this._callBackStore._notReplied); | ||
// For each key check if it's a callback that needs to be returned | ||
for(var j = 0; j < keys.length; j++) { | ||
var info = this._callBackStore._notReplied[keys[j]]; | ||
// Check if we have a chained command (findAndModify) | ||
if(info && info['chained'] && Array.isArray(info['chained']) && info['chained'].length > 0) { | ||
var chained = info['chained']; | ||
// Only callback once and the last one is the right one | ||
var finalCallback = chained.pop(); | ||
// Emit only the last event | ||
this._callBackStore.emit(finalCallback, err, null); | ||
// Put back the final callback to ensure we don't call all commands in the chain | ||
chained.push(finalCallback); | ||
// Put back the final callback to ensure we don't call all commands in the chain | ||
chained.push(finalCallback); | ||
// Remove all chained callbacks | ||
for(var i = 0; i < chained.length; i++) { | ||
delete dbInstance._callBackStore._notReplied[chained[i]]; | ||
} | ||
} else { | ||
dbInstance._callBackStore.emit(keys[j], err, null); | ||
// Remove all chained callbacks | ||
for(var i = 0; i < chained.length; i++) { | ||
delete this._callBackStore._notReplied[chained[i]]; | ||
} | ||
} else { | ||
this._callBackStore.emit(keys[j], err, null); | ||
} | ||
@@ -50,3 +69,103 @@ } | ||
/** | ||
* Register a handler | ||
* @ignore | ||
* @api private | ||
*/ | ||
Base.prototype._registerHandler = function(db_command, raw, connection, exhaust, callback) { | ||
// If we have an array of commands, chain them | ||
var chained = Array.isArray(db_command); | ||
// Check if we have exhausted | ||
if(typeof exhaust == 'function') { | ||
callback = exhaust; | ||
exhaust = false; | ||
} | ||
// If they are chained we need to add a special handler situation | ||
if(chained) { | ||
// List off chained id's | ||
var chainedIds = []; | ||
// Add all id's | ||
for(var i = 0; i < db_command.length; i++) chainedIds.push(db_command[i].getRequestId().toString()); | ||
// Register all the commands together | ||
for(var i = 0; i < db_command.length; i++) { | ||
var command = db_command[i]; | ||
// Add the callback to the store | ||
this._callBackStore.once(command.getRequestId(), callback); | ||
// Add the information about the reply | ||
this._callBackStore._notReplied[command.getRequestId().toString()] = {start: new Date().getTime(), 'raw': raw, chained:chainedIds, connection:connection, exhaust:false}; | ||
} | ||
} else { | ||
// Add the callback to the list of handlers | ||
this._callBackStore.once(db_command.getRequestId(), callback); | ||
// Add the information about the reply | ||
this._callBackStore._notReplied[db_command.getRequestId().toString()] = {start: new Date().getTime(), 'raw': raw, connection:connection, exhaust:exhaust}; | ||
} | ||
} | ||
/** | ||
* Re-Register a handler, on the cursor id f.ex | ||
* @ignore | ||
* @api private | ||
*/ | ||
Base.prototype._reRegisterHandler = function(newId, object, callback) { | ||
// Add the callback to the list of handlers | ||
this._callBackStore.once(newId, object.callback.listener); | ||
// Add the information about the reply | ||
this._callBackStore._notReplied[newId] = object.info; | ||
} | ||
/** | ||
* | ||
* @ignore | ||
* @api private | ||
*/ | ||
Base.prototype._callHandler = function(id, document, err) { | ||
// If there is a callback peform it | ||
if(this._callBackStore.listeners(id).length >= 1) { | ||
// Get info object | ||
var info = this._callBackStore._notReplied[id]; | ||
// Delete the current object | ||
delete this._callBackStore._notReplied[id]; | ||
// Emit to the callback of the object | ||
this._callBackStore.emit(id, err, document, info.connection); | ||
} | ||
} | ||
/** | ||
* | ||
* @ignore | ||
* @api private | ||
*/ | ||
Base.prototype._hasHandler = function(id) { | ||
// If there is a callback peform it | ||
return this._callBackStore.listeners(id).length >= 1; | ||
} | ||
/** | ||
* | ||
* @ignore | ||
* @api private | ||
*/ | ||
Base.prototype._removeHandler = function(id) { | ||
// Remove the information | ||
if(this._callBackStore._notReplied[id] != null) delete this._callBackStore._notReplied[id]; | ||
// Remove the callback if it's registered | ||
this._callBackStore.removeAllListeners(id); | ||
// Force cleanup _events, node.js seems to set it as a null value | ||
if(this._callBackStore._events != null) delete this._callBackStore._events[id]; | ||
} | ||
/** | ||
* | ||
* @ignore | ||
* @api private | ||
*/ | ||
Base.prototype._findHandler = function(id) { | ||
var info = this._callBackStore._notReplied[id]; | ||
// Return the callback | ||
return {info:info, callback:(this._callBackStore.listeners(id).length >= 1) ? this._callBackStore.listeners(id)[0] : null} | ||
} | ||
exports.Base = Base; |
@@ -77,3 +77,3 @@ var utils = require('./connection_utils'), | ||
// Check if the driver should validate the certificate | ||
var validate_certificates = this.socketOptions.ssl_validate == true ? true : false; | ||
var validate_certificates = this.socketOptions.sslValidate == true ? true : false; | ||
@@ -88,9 +88,9 @@ // Create options for the tls connection | ||
if(validate_certificates) { | ||
tls_options.ca = this.socketOptions.ssl_ca; | ||
tls_options.ca = this.socketOptions.sslCA; | ||
} | ||
// If we have a certificate to present | ||
if(this.socketOptions.ssl_cert) { | ||
tls_options.cert = this.socketOptions.ssl_cert; | ||
tls_options.key = this.socketOptions.ssl_key; | ||
if(this.socketOptions.sslCert) { | ||
tls_options.cert = this.socketOptions.sslCert; | ||
tls_options.key = this.socketOptions.sslKey; | ||
// Allow for a combined cert/key pem file being passed in as cert parameter | ||
@@ -103,4 +103,4 @@ // if(tls_options.key == null) { | ||
// If the driver has been provided a private key password | ||
if(this.socketOptions.ssl_pass) { | ||
tls_options.passphrase = this.socketOptions.ssl_pass; | ||
if(this.socketOptions.sslPass) { | ||
tls_options.passphrase = this.socketOptions.sslPass; | ||
} | ||
@@ -107,0 +107,0 @@ |
@@ -48,2 +48,3 @@ var ReadPreference = require('./read_preference').ReadPreference | ||
var server = this.servers[i]; | ||
server._callBackStore = this._callBackStore; | ||
// Default empty socket options object | ||
@@ -50,0 +51,0 @@ var socketOptions = {host: server.host, port: server.port}; |
@@ -40,7 +40,7 @@ var Connection = require('./connection').Connection, | ||
* - **ssl** {Boolean, default:false}, use ssl connection (needs to have a mongod server with ssl support) | ||
* - **ssl_validate** {Boolean, default:false}, validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_ca** {Array, default:null}, Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_cert** {Buffer/String, default:null}, String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_key** {Buffer/String, default:null}, String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_pass** {Buffer/String, default:null}, String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslValidate** {Boolean, default:false}, validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslCA** {Array, default:null}, Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslCert** {Buffer/String, default:null}, String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslKey** {Buffer/String, default:null}, String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslPass** {Buffer/String, default:null}, String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* | ||
@@ -90,15 +90,15 @@ * @class Represents a Replicaset Configuration | ||
// Set ssl validation | ||
this.ssl_validate = this.options.ssl_validate == null ? false : this.options.ssl_validate; | ||
this.sslValidate = this.options.sslValidate == null ? false : this.options.sslValidate; | ||
// Set the ssl certificate authority (array of Buffer/String keys) | ||
this.ssl_ca = Array.isArray(this.options.ssl_ca) ? this.options.ssl_ca : null; | ||
this.sslCA = Array.isArray(this.options.sslCA) ? this.options.sslCA : null; | ||
// Certificate to present to the server | ||
this.ssl_cert = this.options.ssl_cert; | ||
this.sslCert = this.options.sslCert; | ||
// Certificate private key if in separate file | ||
this.ssl_key = this.options.ssl_key; | ||
this.sslKey = this.options.sslKey; | ||
// Password to unlock private key | ||
this.ssl_pass = this.options.ssl_pass; | ||
this.sslPass = this.options.sslPass; | ||
// Ensure we are not trying to validate with no list of certificates | ||
if(this.ssl_validate && (!Array.isArray(this.ssl_ca) || this.ssl_ca.length == 0)) { | ||
throw new Error("The driver expects an Array of CA certificates in the ssl_ca parameter when enabling ssl_validate"); | ||
if(this.sslValidate && (!Array.isArray(this.sslCA) || this.sslCA.length == 0)) { | ||
throw new Error("The driver expects an Array of CA certificates in the sslCA parameter when enabling sslValidate"); | ||
} | ||
@@ -178,8 +178,10 @@ | ||
server.ssl = self.ssl; | ||
server.ssl_validate = self.ssl_validate; | ||
server.ssl_ca = self.ssl_ca; | ||
server.ssl_cert = self.ssl_cert; | ||
server.ssl_key = self.ssl_key; | ||
server.ssl_pass = self.ssl_pass; | ||
server.sslValidate = self.sslValidate; | ||
server.sslCA = self.sslCA; | ||
server.sslCert = self.sslCert; | ||
server.sslKey = self.sslKey; | ||
server.sslPass = self.sslPass; | ||
server.poolSize = self.poolSize; | ||
// Set callback store | ||
server._callBackStore = self._callBackStore; | ||
}); | ||
@@ -378,9 +380,10 @@ | ||
ssl: self.ssl, | ||
ssl_validate: self.ssl_validate, | ||
ssl_ca: self.ssl_ca, | ||
ssl_cert: self.ssl_cert, | ||
ssl_key: self.ssl_key, | ||
ssl_pass: self.ssl_pass | ||
sslValidate: self.sslValidate, | ||
sslCA: self.sslCA, | ||
sslCert: self.sslCert, | ||
sslKey: self.sslKey, | ||
sslPass: self.sslPass | ||
}); | ||
self._haServer._callBackStore = self._callBackStore; | ||
// Add close handler | ||
@@ -604,10 +607,11 @@ self.on("close", function() { | ||
ssl: replset.ssl, | ||
ssl_validate: replset.ssl_validate, | ||
ssl_ca: replset.ssl_ca, | ||
ssl_cert: replset.ssl_cert, | ||
ssl_key: replset.ssl_key, | ||
ssl_pass: replset.ssl_pass | ||
sslValidate: replset.sslValidate, | ||
sslCA: replset.sslCA, | ||
sslCert: replset.sslCert, | ||
sslKey: replset.sslKey, | ||
sslPass: replset.sslPass | ||
} | ||
var server = new Server(socketOptions.host, socketOptions.port, serverOptions); | ||
server._callBackStore = replset._callBackStore; | ||
server.replicasetInstance = replset; | ||
@@ -739,9 +743,10 @@ server.on("close", _handler("close", replset)); | ||
var candidateServer = new Server(parts[0], parseInt(parts[1])); | ||
candidateServer._callBackStore = self._callBackStore; | ||
candidateServer.name = possibleHosts[j]; | ||
candidateServer.ssl = self.ssl; | ||
candidateServer.ssl_validate = self.ssl_validate; | ||
candidateServer.ssl_ca = self.ssl_ca; | ||
candidateServer.ssl_cert = self.ssl_cert; | ||
candidateServer.ssl_key = self.ssl_key; | ||
candidateServer.ssl_pass = self.ssl_pass; | ||
candidateServer.sslValidate = self.sslValidate; | ||
candidateServer.sslCA = self.sslCA; | ||
candidateServer.sslCert = self.sslCert; | ||
candidateServer.sslKey = self.sslKey; | ||
candidateServer.sslPass = self.sslPass; | ||
@@ -748,0 +753,0 @@ // Set the candidate server |
@@ -17,7 +17,7 @@ var Connection = require('./connection').Connection, | ||
* - **ssl** {Boolean, default:false}, use ssl connection (needs to have a mongod server with ssl support) | ||
* - **ssl_validate** {Boolean, default:false}, validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_ca** {Array, default:null}, Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_cert** {Buffer/String, default:null}, String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_key** {Buffer/String, default:null}, String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **ssl_pass** {Buffer/String, default:null}, String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslValidate** {Boolean, default:false}, validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslCA** {Array, default:null}, Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslCert** {Buffer/String, default:null}, String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslKey** {Buffer/String, default:null}, String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **sslPass** {Buffer/String, default:null}, String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher) | ||
* - **slaveOk** {Boolean, default:false}, legacy option allowing reads from secondary, use **readPrefrence** instead. | ||
@@ -64,15 +64,15 @@ * - **poolSize** {Number, default:5}, number of connections in the connection pool, set to 5 as default for legacy reasons. | ||
// Set ssl validation | ||
this.ssl_validate = this.options.ssl_validate == null ? false : this.options.ssl_validate; | ||
this.sslValidate = this.options.sslValidate == null ? false : this.options.sslValidate; | ||
// Set the ssl certificate authority (array of Buffer/String keys) | ||
this.ssl_ca = Array.isArray(this.options.ssl_ca) ? this.options.ssl_ca : null; | ||
this.sslCA = Array.isArray(this.options.sslCA) ? this.options.sslCA : null; | ||
// Certificate to present to the server | ||
this.ssl_cert = this.options.ssl_cert; | ||
this.sslCert = this.options.sslCert; | ||
// Certificate private key if in separate file | ||
this.ssl_key = this.options.ssl_key; | ||
this.sslKey = this.options.sslKey; | ||
// Password to unlock private key | ||
this.ssl_pass = this.options.ssl_pass; | ||
this.sslPass = this.options.sslPass; | ||
// Ensure we are not trying to validate with no list of certificates | ||
if(this.ssl_validate && (!Array.isArray(this.ssl_ca) || this.ssl_ca.length == 0)) { | ||
throw new Error("The driver expects an Array of CA certificates in the ssl_ca parameter when enabling ssl_validate"); | ||
if(this.sslValidate && (!Array.isArray(this.sslCA) || this.sslCA.length == 0)) { | ||
throw new Error("The driver expects an Array of CA certificates in the sslCA parameter when enabling sslValidate"); | ||
} | ||
@@ -108,11 +108,11 @@ | ||
// Set ssl validation | ||
this.socketOptions.ssl_validate = this.ssl_validate == null ? false : this.ssl_validate; | ||
this.socketOptions.sslValidate = this.sslValidate == null ? false : this.sslValidate; | ||
// Set the ssl certificate authority (array of Buffer/String keys) | ||
this.socketOptions.ssl_ca = Array.isArray(this.ssl_ca) ? this.ssl_ca : null; | ||
this.socketOptions.sslCA = Array.isArray(this.sslCA) ? this.sslCA : null; | ||
// Set certificate to present | ||
this.socketOptions.ssl_cert = this.ssl_cert; | ||
this.socketOptions.sslCert = this.sslCert; | ||
// Set certificate to present | ||
this.socketOptions.ssl_key = this.ssl_key; | ||
this.socketOptions.sslKey = this.sslKey; | ||
// Password to unlock private key | ||
this.socketOptions.ssl_pass = this.ssl_pass; | ||
this.socketOptions.sslPass = this.sslPass; | ||
} | ||
@@ -238,11 +238,11 @@ | ||
// Set ssl validation | ||
this.socketOptions.ssl_validate = replset.ssl_validate == null ? false : replset.ssl_validate; | ||
this.socketOptions.sslValidate = replset.sslValidate == null ? false : replset.sslValidate; | ||
// Set the ssl certificate authority (array of Buffer/String keys) | ||
this.socketOptions.ssl_ca = Array.isArray(replset.ssl_ca) ? replset.ssl_ca : null; | ||
this.socketOptions.sslCA = Array.isArray(replset.sslCA) ? replset.sslCA : null; | ||
// Set certificate to present | ||
this.socketOptions.ssl_cert = replset.ssl_cert; | ||
this.socketOptions.sslCert = replset.sslCert; | ||
// Set certificate to present | ||
this.socketOptions.ssl_key = replset.ssl_key; | ||
this.socketOptions.sslKey = replset.sslKey; | ||
// Password to unlock private key | ||
this.socketOptions.ssl_pass = replset.ssl_pass; | ||
this.socketOptions.sslPass = replset.sslPass; | ||
} | ||
@@ -274,11 +274,11 @@ | ||
// Set ssl validation | ||
this.socketOptions.ssl_validate = this.ssl_validate == null ? false : this.ssl_validate; | ||
this.socketOptions.sslValidate = this.sslValidate == null ? false : this.sslValidate; | ||
// Set the ssl certificate authority (array of Buffer/String keys) | ||
this.socketOptions.ssl_ca = Array.isArray(this.ssl_ca) ? this.ssl_ca : null; | ||
this.socketOptions.sslCA = Array.isArray(this.sslCA) ? this.sslCA : null; | ||
// Set certificate to present | ||
this.socketOptions.ssl_cert = this.ssl_cert; | ||
this.socketOptions.sslCert = this.sslCert; | ||
// Set certificate to present | ||
this.socketOptions.ssl_key = this.ssl_key; | ||
this.socketOptions.sslKey = this.sslKey; | ||
// Password to unlock private key | ||
this.socketOptions.ssl_pass = this.ssl_pass; | ||
this.socketOptions.sslPass = this.sslPass; | ||
} | ||
@@ -355,3 +355,3 @@ | ||
// Register handler for messages | ||
dbInstance._registerHandler(db_command, false, connection, connectHandler); | ||
server._registerHandler(db_command, false, connection, connectHandler); | ||
@@ -370,2 +370,3 @@ // Write the command out | ||
mongoReply.parseHeader(message, connectionPool.bson) | ||
// If message size is not the same as the buffer size | ||
@@ -382,22 +383,8 @@ // something went terribly wrong somewhere | ||
// Callback instance | ||
var callbackInfo = null; | ||
var dbInstanceObject = null; | ||
var callbackInfo = server._findHandler(mongoReply.responseTo.toString()); | ||
// Locate a callback instance and remove any additional ones | ||
for(var i = 0; i < server.dbInstances.length; i++) { | ||
var dbInstanceObjectTemp = server.dbInstances[i]; | ||
var hasHandler = dbInstanceObjectTemp._hasHandler(mongoReply.responseTo.toString()); | ||
// Assign the first one we find and remove any duplicate ones | ||
if(hasHandler && callbackInfo == null) { | ||
callbackInfo = dbInstanceObjectTemp._findHandler(mongoReply.responseTo.toString()); | ||
dbInstanceObject = dbInstanceObjectTemp; | ||
} else if(hasHandler) { | ||
dbInstanceObjectTemp._removeHandler(mongoReply.responseTo.toString()); | ||
} | ||
} | ||
// The command executed another request, log the handler again under that request id | ||
if(mongoReply.requestId > 0 && mongoReply.cursorId.toString() != "0" | ||
&& callbackInfo && callbackInfo.info && callbackInfo.info.exhaust) { | ||
dbInstance._reRegisterHandler(mongoReply.requestId, callbackInfo); | ||
server._reRegisterHandler(mongoReply.requestId, callbackInfo); | ||
} | ||
@@ -412,3 +399,3 @@ | ||
for(var i = 0; i < chained.length; i++) { | ||
if(dbInstanceObject._hasHandler(chained[i])) numberOfFoundCallbacks++; | ||
if(server._hasHandler(chained[i])) numberOfFoundCallbacks++; | ||
} | ||
@@ -419,3 +406,3 @@ | ||
for(var i = 0; i < chained.length; i++) { | ||
dbInstanceObject._removeHandler(chained[i]); | ||
server._removeHandler(chained[i]); | ||
} | ||
@@ -463,3 +450,3 @@ | ||
// Fetch the callback | ||
var callbackInfo = dbInstanceObject._findHandler(mongoReply.responseTo.toString()); | ||
var callbackInfo = server._findHandler(mongoReply.responseTo.toString()); | ||
// If we have an error let's execute the callback and clean up all other | ||
@@ -473,3 +460,3 @@ // chained commands | ||
// Trigger the callback for the error | ||
dbInstanceObject._callHandler(mongoReply.responseTo, mongoReply, null); | ||
server._callHandler(mongoReply.responseTo, mongoReply, null); | ||
} else { | ||
@@ -482,9 +469,9 @@ var chainedIds = callbackInfo.info.chained; | ||
// Remove listeners | ||
for(var i = 0; i < chainedIds.length; i++) dbInstanceObject._removeHandler(chainedIds[i]); | ||
for(var i = 0; i < chainedIds.length; i++) server._removeHandler(chainedIds[i]); | ||
// Call the handler | ||
dbInstanceObject._callHandler(mongoReply.responseTo, callbackInfo.info.results.shift(), null); | ||
server._callHandler(mongoReply.responseTo, callbackInfo.info.results.shift(), null); | ||
} else{ | ||
// Add the results to all the results | ||
for(var i = 0; i < chainedIds.length; i++) { | ||
var handler = dbInstanceObject._findHandler(chainedIds[i]); | ||
var handler = server._findHandler(chainedIds[i]); | ||
// Check if we have an object, if it's the case take the current object commands and | ||
@@ -543,3 +530,3 @@ // and add this one | ||
dbInstanceObject._callHandler(mongoReply.responseTo, mongoReply, null); | ||
server._callHandler(mongoReply.responseTo, mongoReply, null); | ||
}); | ||
@@ -546,0 +533,0 @@ } |
@@ -13,2 +13,4 @@ var Server = require("../server").Server; | ||
this.Db = require("../../db").Db; | ||
// Active db connections | ||
this.dbs = {}; | ||
} | ||
@@ -129,15 +131,21 @@ | ||
if(self.state == 'disconnected') return; | ||
var addresses = self.replicaset._state.addresses; | ||
// Grab all servers | ||
var serverKeys = Object.keys(addresses); | ||
// Create a list of all servers we can send the ismaster command to | ||
var allServers = self.replicaset._state.master != null ? [self.replicaset._state.master] : []; | ||
// Secondary keys | ||
var keys = Object.keys(self.replicaset._state.secondaries); | ||
// Add all secondaries | ||
for(var i = 0; i < keys.length; i++) { | ||
allServers.push(self.replicaset._state.secondaries[keys[i]]); | ||
} | ||
// Number of server entries | ||
var numberOfEntries = serverKeys.length; | ||
var numberOfEntries = allServers.length; | ||
// We got keys | ||
for(var i = 0; i < serverKeys.length; i++) { | ||
for(var i = 0; i < allServers.length; i++) { | ||
// We got a server instance | ||
var server = addresses[serverKeys[i]]; | ||
var server = allServers[i]; | ||
@@ -147,29 +155,84 @@ // Create a new server object, avoid using internal connections as they might | ||
new function(serverInstance) { | ||
var options = { poolSize: 1, timeout: 500, auto_reconnect: false }; | ||
var server = new Server(serverInstance.host, serverInstance.port, options); | ||
var db = new self.Db(self.replicaset.db.databaseName, server, { safe: true }); | ||
var _db = self.dbs[serverInstance.host + ":" + serverInstance.port]; | ||
// If we have a db | ||
if(_db != null) { | ||
// Startup time of the command | ||
var startTime = Date.now(); | ||
db.on("error", done); | ||
// Execute ping command in own scope | ||
var _ping = function(__db, __serverInstance) { | ||
// Execute ping on this connection | ||
__db.executeDbCommand({ping:1}, {failFast:true}, function(err) { | ||
if(err) { | ||
delete self.dbs[__db.serverConfig.host + ":" + __db.serverConfig.port]; | ||
__db.close(); | ||
return done(); | ||
} | ||
// Open the db instance | ||
db.open(function(err, _db) { | ||
if(err) return done(err, _db); | ||
if(null != __serverInstance.runtimeStats && __serverInstance.isConnected()) { | ||
__serverInstance.runtimeStats['pingMs'] = Date.now() - startTime; | ||
} | ||
// Startup time of the command | ||
var startTime = Date.now(); | ||
done(); | ||
}); | ||
}; | ||
// Ping | ||
_ping(_db, serverInstance); | ||
} else { | ||
// Create a new master connection | ||
var _server = new Server(serverInstance.host, serverInstance.port, { | ||
auto_reconnect: false, | ||
returnIsMasterResults: true, | ||
slaveOk: true, | ||
poolSize: 1, | ||
socketOptions: { connectTimeoutMS: self.replicaset._connectTimeoutMS }, | ||
ssl: self.replicaset.ssl, | ||
sslValidate: self.replicaset.sslValidate, | ||
sslCA: self.replicaset.sslCA, | ||
sslCert: self.replicaset.sslCert, | ||
sslKey: self.replicaset.sslKey, | ||
sslPass: self.replicaset.sslPass | ||
}); | ||
// Execute ping on this connection | ||
db.executeDbCommand({ping:1}, {failFast:true}, function() { | ||
if(null != serverInstance.runtimeStats && serverInstance.isConnected()) { | ||
serverInstance.runtimeStats['pingMs'] = Date.now() - startTime; | ||
} | ||
done(null, _db); | ||
// Create Db instance | ||
var _db = new self.Db(self.replicaset.db.databaseName, _server, { safe: true }); | ||
_db.on("close", function() { | ||
delete self.dbs[this.serverConfig.host + ":" + this.serverConfig.port]; | ||
}) | ||
}) | ||
function done (err, _db) { | ||
// Close connection | ||
if (_db) _db.close(true); | ||
var _ping = function(__db, __serverInstance) { | ||
__db.open(function(err, db) { | ||
if(err) { | ||
delete self.dbs[__db.serverConfig.host + ":" + __db.serverConfig.port]; | ||
__db.close(); | ||
return done(); | ||
} | ||
// Save instance | ||
self.dbs[__db.serverConfig.host + ":" + __db.serverConfig.port] = __db; | ||
// Startup time of the command | ||
var startTime = Date.now(); | ||
// Execute ping on this connection | ||
__db.executeDbCommand({ping:1}, {failFast:true}, function(err) { | ||
if(err) { | ||
delete self.dbs[__db.serverConfig.host + ":" + __db.serverConfig.port]; | ||
__db.close(); | ||
return done(); | ||
} | ||
if(null != __serverInstance.runtimeStats && __serverInstance.isConnected()) { | ||
__serverInstance.runtimeStats['pingMs'] = Date.now() - startTime; | ||
} | ||
done(); | ||
}); | ||
}); | ||
}; | ||
_ping(_db, serverInstance); | ||
} | ||
function done() { | ||
// Adjust the number of checks | ||
@@ -188,5 +251,6 @@ numberOfEntries--; | ||
// Start pingFunction | ||
setTimeout(pingFunction, 1000); | ||
// setTimeout(pingFunction, 1000); | ||
pingFunction(); | ||
callback && callback(null); | ||
} |
{ "name" : "mongodb" | ||
, "description" : "A node.js driver for MongoDB" | ||
, "keywords" : ["mongodb", "mongo", "driver", "db"] | ||
, "version" : "1.2.10" | ||
, "version" : "1.2.11" | ||
, "author" : "Christian Amor Kvalheim <christkv@gmail.com>" | ||
@@ -64,3 +64,3 @@ , "contributors" : [ "Aaron Heckmann", | ||
, "dependencies" : { | ||
"bson": "0.1.5" | ||
"bson": "0.1.6" | ||
} | ||
@@ -67,0 +67,0 @@ , "devDependencies": { |
Sorry, the diff of this file is too big to display
595900
13755
+ Addedbson@0.1.6(transitive)
- Removedbson@0.1.5(transitive)
Updatedbson@0.1.6