Comparing version 1.0.2 to 1.1.0-beta
@@ -15,5 +15,4 @@ /*! | ||
*/ | ||
function Admin(db) { | ||
function Admin(db) { | ||
if(!(this instanceof Admin)) return new Admin(db); | ||
this.db = db; | ||
@@ -25,3 +24,3 @@ }; | ||
* instance of the db client | ||
* | ||
* | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -38,3 +37,3 @@ * @return {null} Returns no result | ||
* instance of the db client | ||
* | ||
* | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -45,5 +44,3 @@ * @return {null} Returns no result | ||
Admin.prototype.serverInfo = function(callback) { | ||
var self = this; | ||
var command = {buildinfo:1}; | ||
this.command(command, function(err, doc) { | ||
this.db.executeDbAdminCommand({buildinfo:1}, function(err, doc) { | ||
if(err != null) return callback(err, null); | ||
@@ -64,11 +61,8 @@ return callback(null, doc.documents[0]); | ||
this.command({serverStatus: 1}, function(err, result) { | ||
if (err == null && result.documents[0].ok == 1) { | ||
callback(null, result.documents[0]); | ||
this.db.executeDbAdminCommand({serverStatus: 1}, function(err, doc) { | ||
if(err == null && doc.documents[0].ok === 1) { | ||
callback(null, doc.documents[0]); | ||
} else { | ||
if (err) { | ||
callback(err, false); | ||
} else { | ||
callback(self.wrap(result.documents[0]), false); | ||
} | ||
if(err) return callback(err, false); | ||
return callback(self.wrap(doc.documents[0]), false); | ||
} | ||
@@ -80,3 +74,3 @@ }); | ||
* Retrieve the current profiling Level for MongoDB | ||
* | ||
* | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -88,18 +82,12 @@ * @return {null} Returns no result | ||
var self = this; | ||
var command = {profile:-1}; | ||
this.command(command, function(err, doc) { | ||
this.db.executeDbAdminCommand({profile:-1}, function(err, doc) { | ||
doc = doc.documents[0]; | ||
if(err == null && (doc.ok == 1 || typeof doc.was === 'number')) { | ||
if(err == null && doc.ok === 1) { | ||
var was = doc.was; | ||
if(was == 0) { | ||
callback(null, "off"); | ||
} else if(was == 1) { | ||
callback(null, "slow_only"); | ||
} else if(was == 2) { | ||
callback(null, "all"); | ||
} else { | ||
callback(new Error("Error: illegal profiling level value " + was), null); | ||
} | ||
if(was == 0) return callback(null, "off"); | ||
if(was == 1) return callback(null, "slow_only"); | ||
if(was == 2) return callback(null, "all"); | ||
return callback(new Error("Error: illegal profiling level value " + was), null); | ||
} else { | ||
@@ -114,3 +102,2 @@ err != null ? callback(err, null) : callback(new Error("Error with profile command"), null); | ||
* | ||
* @param {Object} [options] Optional parameters to the command. | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -124,11 +111,4 @@ * @return {null} Returns no result | ||
callback = args.pop(); | ||
options = args.length ? args.shift() : {}; | ||
// Set self | ||
var self = this; | ||
var databaseName = this.db.databaseName; | ||
this.db.databaseName = 'admin'; | ||
this.db.executeDbCommand({ping:1}, options, function(err, result) { | ||
self.db.databaseName = databaseName; | ||
return callback(err, result); | ||
}) | ||
this.db.executeDbAdminCommand({ping: 1}, callback); | ||
} | ||
@@ -138,3 +118,3 @@ | ||
* Authenticate against MongoDB | ||
* | ||
* | ||
* @param {String} username The user name for the authentication. | ||
@@ -147,8 +127,4 @@ * @param {String} password The password for the authentication. | ||
Admin.prototype.authenticate = function(username, password, callback) { | ||
var self = this; | ||
var databaseName = this.db.databaseName; | ||
this.db.databaseName = 'admin'; | ||
this.db.authenticate(username, password, function(err, result) { | ||
self.db.databaseName = databaseName; | ||
return callback(err, result); | ||
this.db.authenticate(username, password, {authdb: 'admin'}, function(err, doc) { | ||
return callback(err, doc); | ||
}) | ||
@@ -160,3 +136,3 @@ } | ||
* | ||
* @param {Object} [options] Optional parameters to the command. | ||
* @param {Object} [options] Optional parameters to the command. | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -167,10 +143,5 @@ * @return {null} Returns no result | ||
Admin.prototype.logout = function(callback) { | ||
var self = this; | ||
var databaseName = this.db.databaseName; | ||
this.db.databaseName = 'admin'; | ||
this.db.logout(function(err, result) { | ||
return callback(err, result); | ||
}) | ||
self.db.databaseName = databaseName; | ||
this.db.logout({authdb: 'admin'}, function(err, doc) { | ||
return callback(err, doc); | ||
}) | ||
} | ||
@@ -193,3 +164,2 @@ | ||
Admin.prototype.addUser = function(username, password, options, callback) { | ||
var self = this; | ||
var args = Array.prototype.slice.call(arguments, 2); | ||
@@ -199,9 +169,7 @@ callback = args.pop(); | ||
var self = this; | ||
var databaseName = this.db.databaseName; | ||
this.db.databaseName = 'admin'; | ||
this.db.addUser(username, password, options, function(err, result) { | ||
self.db.databaseName = databaseName; | ||
return callback(err, result); | ||
}) | ||
options.dbName = 'admin'; | ||
// Add user | ||
this.db.addUser(username, password, options, function(err, doc) { | ||
return callback(err, doc); | ||
}) | ||
} | ||
@@ -227,9 +195,7 @@ | ||
var self = this; | ||
var databaseName = this.db.databaseName; | ||
this.db.databaseName = 'admin'; | ||
this.db.removeUser(username, options, function(err, result) { | ||
self.db.databaseName = databaseName; | ||
return callback(err, result); | ||
}) | ||
options.dbName = 'admin'; | ||
// Remove the user | ||
this.db.removeUser(username, options, function(err, doc) { | ||
return callback(err, doc); | ||
}) | ||
} | ||
@@ -239,3 +205,3 @@ | ||
* Set the current profiling level of MongoDB | ||
* | ||
* | ||
* @param {String} level The new profiling level (off, slow_only, all) | ||
@@ -262,12 +228,10 @@ * @param {Function} callback Callback function of format `function(err, result) {}`. | ||
// Set up the profile number | ||
command['profile'] = profile; | ||
// Execute the command to set the profiling level | ||
this.command(command, function(err, doc) { | ||
command['profile'] = profile; | ||
this.db.executeDbAdminCommand(command, function(err, doc) { | ||
doc = doc.documents[0]; | ||
if(err == null && (doc.ok == 1 || typeof doc.was === 'number')) { | ||
if(err == null && doc.ok === 1) | ||
return callback(null, level); | ||
} else { | ||
return err != null ? callback(err, null) : callback(new Error("Error with profile command"), null); | ||
} | ||
return err != null ? callback(err, null) : callback(new Error("Error with profile command"), null); | ||
}); | ||
@@ -278,3 +242,3 @@ }; | ||
* Retrive the current profiling information for MongoDB | ||
* | ||
* | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -285,15 +249,11 @@ * @return {null} Returns no result | ||
Admin.prototype.profilingInfo = function(callback) { | ||
var self = this; | ||
var databaseName = this.db.databaseName; | ||
this.db.databaseName = 'admin'; | ||
try { | ||
new Cursor(this.db, new Collection(this.db, DbCommand.SYSTEM_PROFILE_COLLECTION), {}).toArray(function(err, items) { | ||
return callback(err, items); | ||
}); | ||
new Cursor(this.db, new Collection(this.db, DbCommand.SYSTEM_PROFILE_COLLECTION), {}, null, null, null | ||
, null, null, null, null, null, null, null, null, null, null | ||
, null, null, null, null, null, null, null, null, 'admin').toArray(function(err, items) { | ||
return callback(err, items); | ||
}); | ||
} catch (err) { | ||
return callback(err, null); | ||
} | ||
self.db.databaseName = databaseName; | ||
}; | ||
@@ -303,5 +263,5 @@ | ||
* Execute a db command against the Admin database | ||
* | ||
* | ||
* @param {Object} command A command object `{ping:1}`. | ||
* @param {Object} [options] Optional parameters to the command. | ||
* @param {Object} [options] Optional parameters to the command. | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -318,5 +278,5 @@ * @return {null} Returns no result | ||
// Execute a command | ||
this.db.executeDbAdminCommand(command, options, function(err, result) { | ||
this.db.executeDbAdminCommand(command, options, function(err, doc) { | ||
// Ensure change before event loop executes | ||
return callback != null ? callback(err, result) : null; | ||
return callback != null ? callback(err, doc) : null; | ||
}); | ||
@@ -327,5 +287,5 @@ } | ||
* Validate an existing collection | ||
* | ||
* | ||
* @param {String} collectionName The name of the collection to validate. | ||
* @param {Object} [options] Optional parameters to the command. | ||
* @param {Object} [options] Optional parameters to the command. | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -343,3 +303,3 @@ * @return {null} Returns no result | ||
var keys = Object.keys(options); | ||
// Decorate command with extra options | ||
@@ -353,16 +313,15 @@ for(var i = 0; i < keys.length; i++) { | ||
this.db.executeDbCommand(command, function(err, doc) { | ||
if(err != null) return callback(err, null); | ||
if(err != null) return callback(err, null); | ||
doc = doc.documents[0]; | ||
if(doc.ok == 0) { | ||
if(doc.ok === 0) | ||
return callback(new Error("Error with validate command"), null); | ||
} else if(doc.result != null && doc.result.constructor != String) { | ||
if(doc.result != null && doc.result.constructor != String) | ||
return callback(new Error("Error with validation data"), null); | ||
} else if(doc.result != null && doc.result.match(/exception|corrupt/) != null) { | ||
if(doc.result != null && doc.result.match(/exception|corrupt/) != null) | ||
return callback(new Error("Error: invalid collection " + collectionName), null); | ||
} else if(doc.valid != null && !doc.valid) { | ||
return callback(new Error("Error: invalid collection " + collectionName), null); | ||
} else { | ||
return callback(null, doc); | ||
} | ||
if(doc.valid != null && !doc.valid) | ||
return callback(new Error("Error: invalid collection " + collectionName), null); | ||
return callback(null, doc); | ||
}); | ||
@@ -373,3 +332,3 @@ }; | ||
* List the available databases | ||
* | ||
* | ||
* @param {Function} callback Callback function of format `function(err, result) {}`. | ||
@@ -381,9 +340,6 @@ * @return {null} Returns no result | ||
// Execute the listAllDatabases command | ||
this.db.executeDbAdminCommand({listDatabases:1}, {}, function(err, result) { | ||
if(err != null) { | ||
callback(err, null); | ||
} else { | ||
callback(null, result.documents[0]); | ||
} | ||
}); | ||
this.db.executeDbAdminCommand({listDatabases:1}, {}, function(err, doc) { | ||
if(err != null) return callback(err, null); | ||
return callback(null, doc.documents[0]); | ||
}); | ||
} | ||
@@ -401,12 +357,7 @@ | ||
this.command({replSetGetStatus:1}, function(err, result) { | ||
if (err == null && result.documents[0].ok == 1) { | ||
callback(null, result.documents[0]); | ||
} else { | ||
if (err) { | ||
callback(err, false); | ||
} else { | ||
callback(self.db.wrap(result.documents[0]), false); | ||
} | ||
} | ||
this.db.executeDbAdminCommand({replSetGetStatus:1}, function(err, doc) { | ||
if(err == null && doc.documents[0].ok === 1) | ||
return callback(null, doc.documents[0]); | ||
if(err) return callback(err, false); | ||
return callback(self.db.wrap(doc.documents[0]), false); | ||
}); | ||
@@ -413,0 +364,0 @@ }; |
@@ -45,3 +45,3 @@ /** | ||
if(!(this instanceof Collection)) return new Collection(db, collectionName, pkFactory, options); | ||
checkCollectionName(collectionName); | ||
@@ -56,6 +56,10 @@ | ||
this.raw = options == null || options.raw == null ? db.raw : options.raw; | ||
this.readPreference = options == null || options.readPreference == null ? db.serverConfig.readPreference : options.readPreference; | ||
this.readPreference = this.readPreference == null ? 'primary' : this.readPreference; | ||
this.pkFactory = pkFactory == null | ||
? ObjectID | ||
: pkFactory; | ||
var self = this; | ||
@@ -82,3 +86,3 @@ } | ||
if(!('function' === typeof callback)) callback = null; | ||
var self = this; | ||
var self = this; | ||
insertAll(self, Array.isArray(docs) ? docs : [docs], options, callback); | ||
@@ -131,6 +135,6 @@ return this; | ||
} | ||
// Ensure options | ||
if(options == null) options = {}; | ||
if(!('function' === typeof callback)) callback = null; | ||
if(!('function' === typeof callback)) callback = null; | ||
// Ensure we have at least an empty selector | ||
@@ -140,7 +144,14 @@ selector = selector == null ? {} : selector; | ||
var flags = 0 | (options.single ? 1 : 0); | ||
// DbName | ||
var dbName = options['dbName']; | ||
// If no dbname defined use the db one | ||
if(dbName == null) { | ||
dbName = this.db.databaseName; | ||
} | ||
// Create a delete command | ||
var deleteCommand = new DeleteCommand( | ||
this.db | ||
, this.db.databaseName + "." + this.collectionName | ||
, dbName + "." + this.collectionName | ||
, selector | ||
@@ -175,3 +186,3 @@ , flags); | ||
error = error && error.documents; | ||
if(!callback) return; | ||
if(!callback) return; | ||
@@ -184,6 +195,6 @@ if(err) { | ||
callback(null, error[0].n); | ||
} | ||
}); | ||
} | ||
}); | ||
} else { | ||
var result = this.db._executeRemoveCommand(deleteCommand); | ||
var result = this.db._executeRemoveCommand(deleteCommand); | ||
// If no callback just return | ||
@@ -214,3 +225,3 @@ if (!callback) return; | ||
self.db._executeQueryCommand(DbCommand.createRenameCollectionCommand(self.db, self.collectionName, newName), function(err, result) { | ||
if(err == null && result.documents[0].ok == 1) { | ||
if(err == null && result.documents[0].ok == 1) { | ||
if(callback != null) { | ||
@@ -234,3 +245,3 @@ // Set current object to point to the new name | ||
var insertAll = function insertAll (self, docs, options, callback) { | ||
if('function' === typeof options) callback = options, options = {}; | ||
if('function' === typeof options) callback = options, options = {}; | ||
if(options == null) options = {}; | ||
@@ -245,3 +256,10 @@ if(!('function' === typeof callback)) callback = null; | ||
} | ||
// DbName | ||
var dbName = options['dbName']; | ||
// If no dbname defined use the db one | ||
if(dbName == null) { | ||
dbName = self.db.databaseName; | ||
} | ||
// Either use override on the function, or go back to default on either the collection | ||
@@ -254,7 +272,7 @@ // level or db | ||
} | ||
// Pass in options | ||
var insertCommand = new InsertCommand( | ||
self.db | ||
, self.db.databaseName + "." + self.collectionName, true, insertFlags); | ||
, dbName + "." + self.collectionName, true, insertFlags); | ||
@@ -264,3 +282,3 @@ // Add the documents and decorate them with id's if they have none | ||
var doc = docs[index]; | ||
// Add id to each document if it's not already defined | ||
@@ -273,3 +291,3 @@ if (!(Buffer.isBuffer(doc)) && doc['_id'] == null && self.db.forceServerObjectId != true) { | ||
} | ||
// Collect errorOptions | ||
@@ -282,5 +300,5 @@ var errorOptions = options.safe != null ? options.safe : null; | ||
if(errorOptions && errorOptions['safe'] != false && typeof callback !== 'function') throw new Error("safe cannot be used without a callback"); | ||
// Default command options | ||
var commandOptions = {}; | ||
var commandOptions = {}; | ||
// If safe is defined check for error message | ||
@@ -292,3 +310,3 @@ if(errorOptions && errorOptions != false) { | ||
if(errorOptions == null) commandOptions['async'] = true; | ||
// Set safe option | ||
@@ -303,7 +321,7 @@ commandOptions['safe'] = errorOptions; | ||
} | ||
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection) | ||
self.db._executeInsertCommand(insertCommand, commandOptions, function (err, error) { | ||
error = error && error.documents; | ||
if(!callback) return; | ||
if(!callback) return; | ||
@@ -316,6 +334,6 @@ if (err) { | ||
callback(null, docs); | ||
} | ||
}); | ||
} else { | ||
var result = self.db._executeInsertCommand(insertCommand, commandOptions); | ||
} | ||
}); | ||
} else { | ||
var result = self.db._executeInsertCommand(insertCommand, commandOptions); | ||
// If no callback just return | ||
@@ -350,3 +368,3 @@ if(!callback) return; | ||
var errorOptions = options.safe != null ? options.safe : false; | ||
var errorOptions = options.safe != null ? options.safe : false; | ||
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions; | ||
@@ -392,2 +410,9 @@ // Extract the id, if we have one we need to do a update command | ||
// DbName | ||
var dbName = options['dbName']; | ||
// If no dbname defined use the db one | ||
if(dbName == null) { | ||
dbName = this.db.databaseName; | ||
} | ||
// Either use override on the function, or go back to default on either the collection | ||
@@ -399,7 +424,7 @@ // level or db | ||
options['serializeFunctions'] = this.serializeFunctions; | ||
} | ||
} | ||
var updateCommand = new UpdateCommand( | ||
this.db | ||
, this.db.databaseName + "." + this.collectionName | ||
, dbName + "." + this.collectionName | ||
, selector | ||
@@ -411,3 +436,3 @@ , document | ||
// Unpack the error options if any | ||
var errorOptions = (options && options.safe != null) ? options.safe : null; | ||
var errorOptions = (options && options.safe != null) ? options.safe : null; | ||
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions; | ||
@@ -418,5 +443,5 @@ errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions; | ||
if(errorOptions && errorOptions['safe'] != false && typeof callback !== 'function') throw new Error("safe cannot be used without a callback"); | ||
// If we are executing in strict mode or safe both the update and the safe command must happen on the same line | ||
if(errorOptions && errorOptions != false) { | ||
if(errorOptions && errorOptions != false) { | ||
// Insert options | ||
@@ -439,3 +464,3 @@ var commandOptions = {read:false}; | ||
error = error && error.documents; | ||
if(!callback) return; | ||
if(!callback) return; | ||
@@ -449,7 +474,7 @@ if(err) { | ||
callback(null, error[0].n, error[0]); | ||
} | ||
}); | ||
} | ||
}); | ||
} else { | ||
// Execute update | ||
var result = this.db._executeUpdateCommand(updateCommand); | ||
var result = this.db._executeUpdateCommand(updateCommand); | ||
// If no callback just return | ||
@@ -467,6 +492,10 @@ if (!callback) return; | ||
/** | ||
* The distinct command returns returns a list of distinct values for the given key across a collection. | ||
* The distinct command returns returns a list of distinct values for the given key across a collection. | ||
* | ||
* Options | ||
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* | ||
* @param {String} key key to run distinct against. | ||
* @param {Object} [query] option query to narrow the returned objects. | ||
* @param {Object} [options] additional options during update. | ||
* @param {Function} callback must be provided. | ||
@@ -476,22 +505,24 @@ * @return {null} | ||
*/ | ||
Collection.prototype.distinct = function distinct(key, query, callback) { | ||
if ('function' === typeof query) callback = query, query = {}; | ||
Collection.prototype.distinct = function distinct(key, query, options, callback) { | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
callback = args.pop(); | ||
query = args.length ? args.shift() : {}; | ||
options = args.length ? args.shift() : {}; | ||
var mapCommandHash = { | ||
distinct: this.collectionName | ||
, query: query | ||
, key: key | ||
'distinct': this.collectionName | ||
, 'query': query | ||
, 'key': key | ||
}; | ||
// Set read preference if we set one | ||
var readPreference = options['readPreference'] ? options['readPreference'] : false; | ||
// Create the command | ||
var cmd = DbCommand.createDbSlaveOkCommand(this.db, mapCommandHash); | ||
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) { | ||
if (err) { | ||
this.db._executeQueryCommand(cmd, {read:readPreference}, function (err, result) { | ||
if(err) | ||
return callback(err); | ||
} | ||
if (result.documents[0].ok != 1) { | ||
if(result.documents[0].ok != 1) | ||
return callback(new Error(result.documents[0].errmsg)); | ||
} | ||
callback(null, result.documents[0].values); | ||
@@ -504,3 +535,7 @@ }); | ||
* | ||
* Options | ||
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* | ||
* @param {Object} [query] query to filter by before performing count. | ||
* @param {Object} [options] additional options during update. | ||
* @param {Function} callback must be provided. | ||
@@ -510,11 +545,19 @@ * @return {null} | ||
*/ | ||
Collection.prototype.count = function count (query, callback) { | ||
if ('function' === typeof query) callback = query, query = {}; | ||
Collection.prototype.count = function count (query, options, callback) { | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
callback = args.pop(); | ||
query = args.length ? args.shift() : {}; | ||
options = args.length ? args.shift() : {}; | ||
// Final query | ||
var final_query = { | ||
count: this.collectionName | ||
, query: query | ||
, fields: null | ||
'count': this.collectionName | ||
, 'query': query | ||
, 'fields': null | ||
}; | ||
// Set read preference if we set one | ||
var readPreference = options['readPreference'] ? options['readPreference'] : false; | ||
// Set up query options | ||
var queryOptions = QueryCommand.OPTS_NO_CURSOR_TIMEOUT; | ||
@@ -536,13 +579,9 @@ if (this.slaveOk || this.db.slaveOk) { | ||
var self = this; | ||
this.db._executeQueryCommand(queryCommand, {read:true}, function (err, result) { | ||
this.db._executeQueryCommand(queryCommand, {read:readPreference}, function (err, result) { | ||
result = result && result.documents; | ||
if(!callback) return; | ||
if(!callback) return; | ||
if (err) { | ||
callback(err); | ||
} else if (result[0].ok != 1 || result[0].errmsg) { | ||
callback(self.db.wrap(result[0])); | ||
} else { | ||
callback(null, result[0].n); | ||
} | ||
if(err) return callback(err); | ||
if (result[0].ok != 1 || result[0].errmsg) return callback(self.db.wrap(result[0])); | ||
callback(null, result[0].n); | ||
}); | ||
@@ -613,5 +652,5 @@ }; | ||
} | ||
// Unpack the error options if any | ||
var errorOptions = (options && options.safe != null) ? options.safe : null; | ||
var errorOptions = (options && options.safe != null) ? options.safe : null; | ||
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions; | ||
@@ -623,3 +662,3 @@ errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions; | ||
// Add the find and modify command | ||
commands.push(DbCommand.createDbSlaveOkCommand(this.db, queryObject, options)); | ||
commands.push(DbCommand.createDbCommand(this.db, queryObject, options)); | ||
// If we have safe defined we need to return both call results | ||
@@ -631,18 +670,14 @@ var chainedCommands = errorOptions != null ? true : false; | ||
} | ||
// Fire commands and | ||
this.db._executeQueryCommand(commands, function(err, result) { | ||
// Fire commands and | ||
this.db._executeQueryCommand(commands, {read:false}, function(err, result) { | ||
result = result && result.documents; | ||
if(err != null) { | ||
callback(err); | ||
} else if(result[0].err != null) { | ||
callback(self.db.wrap(result[0]), null); | ||
} else if(result[0].errmsg != null && !result[0].errmsg.match(eErrorMessages)) { | ||
// Workaround due to 1.8.X returning an error on no matching object | ||
// while 2.0.X does not not, making 2.0.X behaviour standard | ||
callback(self.db.wrap(result[0]), null, result[0]); | ||
} else { | ||
return callback(null, result[0].value, result[0]); | ||
} | ||
if(err != null) return callback(err); | ||
if(result[0].err != null) return callback(self.db.wrap(result[0]), null); | ||
// Workaround due to 1.8.X returning an error on no matching object | ||
// while 2.0.X does not not, making 2.0.X behaviour standard | ||
if(result[0].errmsg != null && !result[0].errmsg.match(eErrorMessages)) | ||
return callback(self.db.wrap(result[0]), null, result[0]); | ||
return callback(null, result[0].value, result[0]); | ||
}); | ||
@@ -668,7 +703,7 @@ } | ||
sort = args.length ? args.shift() : []; | ||
options = args.length ? args.shift() : {}; | ||
options = args.length ? args.shift() : {}; | ||
// Add the remove option | ||
options['remove'] = true; | ||
// Execute the callback | ||
this.findAndModify(query, sort, null, options, callback); | ||
this.findAndModify(query, sort, null, options, callback); | ||
} | ||
@@ -678,3 +713,3 @@ | ||
, 'timeout' : 1, 'tailable' : 1, 'batchSize' : 1, 'raw' : 1, 'read' : 1 | ||
, 'returnKey' : 1, 'maxScan' : 1, 'min' : 1, 'max' : 1, 'showDiskLoc' : 1, 'comment' : 1}; | ||
, 'returnKey' : 1, 'maxScan' : 1, 'min' : 1, 'max' : 1, 'showDiskLoc' : 1, 'comment' : 1, 'dbName' : 1}; | ||
@@ -703,2 +738,3 @@ /** | ||
* - **tailable** {Boolean, default:false}, specify if the cursor is tailable. | ||
* - **awaitdata** {Boolean, default:false} allow the cursor to wait for data, only applicable for tailable cursor. | ||
* - **batchSize** {Number, default:0}, set the batchSize for the getMoreCommand when iterating over the query results. | ||
@@ -712,3 +748,4 @@ * - **returnKey** {Boolean, default:false}, only return the index key. | ||
* - **raw** {Boolean, default:false}, Return all BSON documents as Raw Buffer documents. | ||
* - **read** {Boolean, default:false}, Tell the query to read from a secondary server. | ||
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* - **numberOfRetries** {Number, default:1}, if using awaidata specifies the number of times to retry on timeout. | ||
* | ||
@@ -736,7 +773,7 @@ * @param {Object} query query object to locate the object to modify | ||
} | ||
if(len === 2 && !Array.isArray(fields)) { | ||
var fieldKeys = Object.getOwnPropertyNames(fields); | ||
var is_option = false; | ||
for(var i = 0; i < fieldKeys.length; i++) { | ||
@@ -748,3 +785,3 @@ if(testForFields[fieldKeys[i]] != null) { | ||
} | ||
if(is_option) { | ||
@@ -775,3 +812,3 @@ options = fields; | ||
if(Buffer.isBuffer(object)) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
@@ -783,7 +820,7 @@ var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
} | ||
// Validate correctness of the field selector | ||
var object = fields; | ||
if(Buffer.isBuffer(object)) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
@@ -794,4 +831,4 @@ var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
} | ||
} | ||
} | ||
// Check special case where we are using an objectId | ||
@@ -828,3 +865,11 @@ if(selector instanceof ObjectID) { | ||
options.slaveOk = options.slaveOk != null ? options.slaveOk : this.db.slaveOk; | ||
// Set option | ||
var o = options; | ||
// Support read/readPreference | ||
if(o["read"] != null) o["readPreference"] = o["read"]; | ||
// Set the read preference | ||
o.read = o["readPreference"] ? o.readPreference : this.readPreference; | ||
// Adjust slave ok if read preference is secondary or secondary only | ||
if(o.read == "secondary" || o.read == "secondaryOnly") options.slaveOk = true; | ||
@@ -836,7 +881,9 @@ // callback for backward compatibility | ||
, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize | ||
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment)); | ||
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment, o.awaitdata | ||
, o.numberOfRetries, o.dbName)); | ||
} else { | ||
return new Cursor(this.db, this, selector, fields, o.skip, o.limit | ||
, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize | ||
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment); | ||
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment, o.awaitdata | ||
, o.numberOfRetries, o.dbName); | ||
} | ||
@@ -909,3 +956,3 @@ }; | ||
* - **raw** {Boolean, default:false}, Return all BSON documents as Raw Buffer documents. | ||
* - **read** {Boolean, default:false}, Tell the query to read from a secondary server. | ||
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* | ||
@@ -926,4 +973,4 @@ * @param {Object} query query object to locate the object to modify | ||
if(err != null) return callback(err instanceof Error ? err : self.db.wrap(new Error(err)), null); | ||
if(items.length == 1) return callback(null, items[0]); | ||
callback(null, null); | ||
if(items.length == 1) return callback(null, items[0]); | ||
callback(null, null); | ||
}); | ||
@@ -936,3 +983,3 @@ }; | ||
* Options | ||
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a | ||
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a | ||
* - **unique** {Boolean, default:false}, creates an unique index. | ||
@@ -963,3 +1010,3 @@ * - **sparse** {Boolean, default:false}, creates a sparse index. | ||
errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions; | ||
// If we have a write concern set and no callback throw error | ||
@@ -976,3 +1023,3 @@ if(errorOptions != null && errorOptions != false && (typeof callback !== 'function' && typeof options !== 'function')) throw new Error("safe cannot be used without a callback"); | ||
* Options | ||
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a | ||
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a | ||
* - **unique** {Boolean, default:false}, creates an unique index. | ||
@@ -1002,3 +1049,3 @@ * - **sparse** {Boolean, default:false}, creates a sparse index. | ||
} | ||
// Collect errorOptions | ||
@@ -1008,6 +1055,6 @@ var errorOptions = options.safe != null ? options.safe : null; | ||
errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions; | ||
// If we have a write concern set and no callback throw error | ||
if(errorOptions != null && errorOptions != false && (typeof callback !== 'function' && typeof options !== 'function')) throw new Error("safe cannot be used without a callback"); | ||
// Execute create index | ||
@@ -1084,3 +1131,3 @@ this.db.ensureIndex(this.collectionName, fieldOrSpec, options, callback); | ||
* @return {null} | ||
* @api public | ||
* @api public | ||
**/ | ||
@@ -1141,7 +1188,7 @@ Collection.prototype.reIndex = function(callback) { | ||
} | ||
var self = this; | ||
var cmd = DbCommand.createDbSlaveOkCommand(this.db, mapCommandHash); | ||
var cmd = DbCommand.createDbCommand(this.db, mapCommandHash); | ||
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) { | ||
this.db._executeQueryCommand(cmd, {read:false}, function (err, result) { | ||
if (err) { | ||
@@ -1151,3 +1198,3 @@ return callback(err); | ||
// | ||
// | ||
if (1 != result.documents[0].ok || result.documents[0].err || result.documents[0].errmsg) { | ||
@@ -1164,21 +1211,25 @@ return callback(self.db.wrap(result.documents[0])); | ||
// invoked with inline? | ||
if (result.documents[0].results) { | ||
if(result.documents[0].results) { | ||
return callback(null, result.documents[0].results, stats); | ||
} | ||
// Create a collection object that wraps the result collection | ||
self.db.collection(result.documents[0].result, function (err, collection) { | ||
// If we wish for no verbosity | ||
if(options['verbose'] == null || !options['verbose']) { | ||
return callback(err, collection); | ||
} | ||
// The returned collection | ||
var collection = null; | ||
// Create statistics value | ||
var stats = {}; | ||
if(result.documents[0].timeMillis) stats['processtime'] = result.documents[0].timeMillis; | ||
if(result.documents[0].counts) stats['counts'] = result.documents[0].counts; | ||
if(result.documents[0].timing) stats['timing'] = result.documents[0].timing; | ||
// Return stats as third set of values | ||
callback(err, collection, stats); | ||
}); | ||
// If we have an object it's a different db | ||
if(result.documents[0].result != null && typeof result.documents[0].result == 'object') { | ||
var doc = result.documents[0].result; | ||
collection = self.db.db(doc.db).collection(doc.collection); | ||
} else { | ||
// Create a collection object that wraps the result collection | ||
collection = self.db.collection(result.documents[0].result) | ||
} | ||
// If we wish for no verbosity | ||
if(options['verbose'] == null || !options['verbose']) { | ||
return callback(err, collection); | ||
} | ||
// Return stats as third set of values | ||
callback(err, collection, stats); | ||
}); | ||
@@ -1221,9 +1272,13 @@ }; | ||
* Run a group command across a collection | ||
* | ||
* Options | ||
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* | ||
* @param {Object|Array|Function|Code} keys an object, array or function expressing the keys to group by. | ||
* @param {Object} condition an optional condition that must be true for a row to be considered. | ||
* @param {Object} initial initial value of the aggregation counter object. | ||
* @param {Object} initial initial value of the aggregation counter object. | ||
* @param {Function|Code} reduce the reduce function aggregates (reduces) the objects iterated | ||
* @param {Function|Code} finalize an optional function to be run on each item in the result set just before the item is returned. | ||
* @param {Boolean} command specify if you wish to run using the internal group command or using eval, default is true. | ||
* @param {Object} [options] additional options during update. | ||
* @param {Function} callback returns the results. | ||
@@ -1233,3 +1288,3 @@ * @return {null} | ||
*/ | ||
Collection.prototype.group = function group(keys, condition, initial, reduce, finalize, command, callback) { | ||
Collection.prototype.group = function group(keys, condition, initial, reduce, finalize, command, options, callback) { | ||
var args = Array.prototype.slice.call(arguments, 3); | ||
@@ -1241,2 +1296,3 @@ callback = args.pop(); | ||
command = args.length ? args.shift() : null; | ||
options = args.length ? args.shift() : {}; | ||
@@ -1249,17 +1305,17 @@ // Make sure we are backward compatible | ||
if (!Array.isArray(keys) && keys instanceof Object && typeof(keys) !== 'function') { | ||
if (!Array.isArray(keys) && keys instanceof Object && typeof(keys) !== 'function' && !(keys instanceof Code)) { | ||
keys = Object.keys(keys); | ||
} | ||
if(typeof reduce === 'function') { | ||
reduce = reduce.toString(); | ||
} | ||
if(typeof finalize === 'function') { | ||
finalize = finalize.toString(); | ||
} | ||
// Set up the command as default | ||
command = command == null ? true : command; | ||
// Execute using the command | ||
@@ -1278,9 +1334,9 @@ if(command) { | ||
, 'out': "inline" | ||
} | ||
} | ||
}; | ||
// if finalize is defined | ||
if(finalize != null) selector.group['finalize'] = finalize; | ||
// Set up group selector | ||
if ('function' === typeof keys) { | ||
if ('function' === typeof keys || keys instanceof Code) { | ||
selector.group.$keyf = keys instanceof Code | ||
@@ -1298,6 +1354,8 @@ ? keys | ||
var cmd = DbCommand.createDbSlaveOkCommand(this.db, selector); | ||
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) { | ||
// Set read preference if we set one | ||
var readPreference = options['readPreference'] ? options['readPreference'] : false; | ||
this.db._executeQueryCommand(cmd, {read:readPreference}, function (err, result) { | ||
if(err != null) return callback(err); | ||
var document = result.documents[0]; | ||
@@ -1321,7 +1379,7 @@ if (null == document.retval) { | ||
scope.initial = initial; | ||
// Pass in the function text to execute within mongodb. | ||
var groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';'); | ||
this.db.eval(new Code(groupfn, scope), function (err, results) { | ||
this.db.eval(new Code(groupfn, scope), function (err, results) { | ||
if (err) return callback(err, null); | ||
@@ -1361,3 +1419,3 @@ callback(null, results.result || results); | ||
} else { | ||
callback(null, document.capped); | ||
callback(null, document && document.capped); | ||
} | ||
@@ -1386,3 +1444,3 @@ }); | ||
} | ||
// All keys found return true | ||
@@ -1393,3 +1451,3 @@ return callback(null, true); | ||
} | ||
}); | ||
}); | ||
} | ||
@@ -1408,2 +1466,3 @@ | ||
* - **includeLocs** {Boolean, default:false}, include the location data fields in the top level of the results MongoDB > 2.X. | ||
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* | ||
@@ -1428,3 +1487,3 @@ * @param {Number} x point to search on the x axis, ensure the indexes are ordered in the same order. | ||
} | ||
// Decorate object if any with known properties | ||
@@ -1440,3 +1499,3 @@ if(options['num'] != null) commandObject['num'] = options['num']; | ||
// Execute the command | ||
this.db.command(commandObject, callback); | ||
this.db.command(commandObject, options, callback); | ||
} | ||
@@ -1451,2 +1510,3 @@ | ||
* - **limit** {Number}, max number of results to return. | ||
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* | ||
@@ -1471,3 +1531,3 @@ * @param {Number} x point to search on the x axis, ensure the indexes are ordered in the same order. | ||
} | ||
// Decorate object if any with known properties | ||
@@ -1480,3 +1540,3 @@ if(options['maxDistance'] != null) commandObject['maxDistance'] = options['maxDistance']; | ||
// Execute the command | ||
this.db.command(commandObject, callback); | ||
this.db.command(commandObject, options, callback); | ||
} | ||
@@ -1499,3 +1559,8 @@ | ||
* | ||
* @param {Array|Objects} pipline a pipleline containing all the object for the execution. | ||
* Options | ||
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* - **explain** {Boolean}, return the query plan for the aggregation pipeline instead of the results. | ||
* | ||
* @param {Array} array containing all the aggregation framework commands for the execution. | ||
* @param {Object} [options] additional options during update. | ||
* @param {Function} callback returns matching documents. | ||
@@ -1505,8 +1570,11 @@ * @return {null} | ||
*/ | ||
Collection.prototype.aggregate = function(pipeline, callback) { | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
Collection.prototype.aggregate = function(pipeline, options, callback) { | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
callback = args.pop(); | ||
options = args.length ? args.shift() : {}; | ||
var self = this; | ||
// Check if we have more than one argument then just make the pipeline | ||
if(!Array.isArray(pipeline)) return callback(new Error("pipline must be an array")); | ||
// Check if we have more than one argument then just make the pipeline | ||
// the remaining arguments | ||
@@ -1516,7 +1584,14 @@ if(args.length > 1) { | ||
} | ||
// Build the command | ||
var command = { aggregate : this.collectionName, pipeline : pipeline}; | ||
// Add all options | ||
var keys = Object.keys(options); | ||
// Add all options | ||
for(var i = 0; i < keys.length; i++) { | ||
command[keys[i]] = options[keys[i]]; | ||
} | ||
// Execute the command | ||
this.db.command(command, function(err, result) { | ||
this.db.command(command, options, function(err, result) { | ||
if(err) { | ||
@@ -1526,5 +1601,7 @@ callback(err); | ||
callback(self.db.wrap(result)); | ||
} else if(typeof result == 'object' && result['serverPipeline']) { | ||
callback(null, result); | ||
} else { | ||
callback(null, result.result); | ||
} | ||
} | ||
}); | ||
@@ -1538,2 +1615,3 @@ } | ||
* - **scale** {Number}, divide the returned sizes by scale value. | ||
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST). | ||
* | ||
@@ -1560,3 +1638,3 @@ * @param {Objects} [options] options for the map reduce job. | ||
// Execute the command | ||
this.db.command(commandObject, callback); | ||
this.db.command(commandObject, options, callback); | ||
} | ||
@@ -1563,0 +1641,0 @@ |
@@ -50,7 +50,7 @@ var QueryCommand = require('./query_command').QueryCommand, | ||
DbCommand.createGetNonceCommand = function(db) { | ||
DbCommand.createGetNonceCommand = function(db, options) { | ||
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, {'getnonce':1}, null); | ||
}; | ||
DbCommand.createAuthenticationCommand = function(db, username, password, nonce) { | ||
DbCommand.createAuthenticationCommand = function(db, username, password, nonce, authdb) { | ||
// Use node md5 generator | ||
@@ -64,7 +64,7 @@ var md5 = crypto.createHash('md5'); | ||
md5.update(nonce + username + hash_password); | ||
var key = md5.digest('hex'); | ||
var key = md5.digest('hex'); | ||
// Creat selector | ||
var selector = {'authenticate':1, 'user':username, 'nonce':nonce, 'key':key}; | ||
// Create db command | ||
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NONE, 0, -1, selector, null); | ||
return new DbCommand(db, authdb + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NONE, 0, -1, selector, null); | ||
}; | ||
@@ -102,3 +102,3 @@ | ||
} | ||
// Final command | ||
// Final command | ||
var command = {'getlasterror':1}; | ||
@@ -111,3 +111,3 @@ // If we have an options Object let's merge in the fields (fsync/wtimeout/w) | ||
} | ||
// Execute command | ||
@@ -131,3 +131,3 @@ return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command, null); | ||
var keys; | ||
// Get all the fields accordingly | ||
@@ -162,3 +162,3 @@ if (fieldOrSpec.constructor === String) { // 'type' | ||
} | ||
// Generate the index name | ||
@@ -173,3 +173,3 @@ var indexName = indexes.join("_"); | ||
options = options == null || typeof options == 'boolean' ? {} : options; | ||
// Add all the options | ||
@@ -181,3 +181,3 @@ var keys = Object.keys(options); | ||
} | ||
// If we don't have the unique property set on the selector | ||
@@ -189,4 +189,6 @@ if(selector['unique'] == null) selector['unique'] = finalUnique; | ||
DbCommand.logoutCommand = function(db, command_hash) { | ||
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null); | ||
DbCommand.logoutCommand = function(db, command_hash, options) { | ||
var dbName = options != null && options['authdb'] != null ? options['authdb'] : db.databaseName; | ||
// Create logout command | ||
return new DbCommand(db, dbName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null); | ||
} | ||
@@ -214,4 +216,8 @@ | ||
DbCommand.createAdminDbCommandSlaveOk = function(db, command_hash) { | ||
return new DbCommand(db, "admin." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT | QueryCommand.OPTS_SLAVE, 0, -1, command_hash, null); | ||
}; | ||
DbCommand.createDbSlaveOkCommand = function(db, command_hash, options) { | ||
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT | QueryCommand.OPTS_SLAVE, 0, -1, command_hash, null, options); | ||
}; |
@@ -31,3 +31,3 @@ var BaseCommand = require('./base_command').BaseCommand, | ||
} | ||
// Make sure we don't get a null exception | ||
@@ -40,6 +40,10 @@ options = options == null ? {} : options; | ||
this.numberToReturn = numberToReturn; | ||
// Ensure we have no null query | ||
query = query == null ? {} : query; | ||
// Wrap query in the $query parameter so we can add read preferences for mongos | ||
this.query = query; | ||
this.returnFieldSelector = returnFieldSelector; | ||
this.db = db; | ||
// Let us defined on a command basis if we want functions to be serialized or not | ||
@@ -56,2 +60,38 @@ if(options['serializeFunctions'] != null && options['serializeFunctions']) { | ||
/* | ||
* Adds the read prefrence to the current command | ||
*/ | ||
QueryCommand.prototype.setMongosReadPreference = function(readPreference, tags) { | ||
// Force the slave ok flag to be set | ||
this.queryOptions |= QueryCommand.OPTS_SLAVE; | ||
// Backward compatibility, ensure $query only set on read preference so 1.8.X works | ||
if((readPreference != null || tags != null) && this.query['$query'] == null) { | ||
this.query = {'$query': this.query}; | ||
} | ||
// If we have no readPreference set and no tags, check if the slaveOk bit is set | ||
if(readPreference == null && tags == null) { | ||
// If we have a slaveOk bit set the read preference for MongoS | ||
if(this.queryOptions & QueryCommand.OPTS_SLAVE) { | ||
this.query['$readPreference'] = {mode: 'secondary'} | ||
} else { | ||
this.query['$readPreference'] = {mode: 'primary'} | ||
} | ||
} | ||
// Build read preference object | ||
if(typeof readPreference == 'object' && readPreference['_type'] == 'ReadPreference') { | ||
this.query['$readPreference'] = readPreference.toObject(); | ||
} else if(readPreference != null) { | ||
// Add the read preference | ||
this.query['$readPreference'] = {mode: readPreference}; | ||
// If we have tags let's add them | ||
if(tags != null) { | ||
this.query['$readPreference']['tags'] = tags; | ||
} | ||
} | ||
} | ||
/* | ||
struct { | ||
@@ -68,10 +108,11 @@ MsgHeader header; // standard message header | ||
QueryCommand.prototype.toBinary = function() { | ||
// Total length of the command | ||
var totalLengthOfCommand = 0; | ||
// Calculate total length of the document | ||
if(Buffer.isBuffer(this.query)) { | ||
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.query.length + (4 * 4); | ||
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.query.length + (4 * 4); | ||
} else { | ||
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson.calculateObjectSize(this.query, this.serializeFunctions, true) + (4 * 4); | ||
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson.calculateObjectSize(this.query, this.serializeFunctions, true) + (4 * 4); | ||
} | ||
// Calculate extra fields size | ||
@@ -90,3 +131,3 @@ if(this.returnFieldSelector != null && !(Buffer.isBuffer(this.returnFieldSelector))) { | ||
// Write the header information to the buffer | ||
_command[_index + 3] = (totalLengthOfCommand >> 24) & 0xff; | ||
_command[_index + 3] = (totalLengthOfCommand >> 24) & 0xff; | ||
_command[_index + 2] = (totalLengthOfCommand >> 16) & 0xff; | ||
@@ -98,3 +139,3 @@ _command[_index + 1] = (totalLengthOfCommand >> 8) & 0xff; | ||
// Write the request ID | ||
_command[_index + 3] = (this.requestId >> 24) & 0xff; | ||
_command[_index + 3] = (this.requestId >> 24) & 0xff; | ||
_command[_index + 2] = (this.requestId >> 16) & 0xff; | ||
@@ -111,3 +152,3 @@ _command[_index + 1] = (this.requestId >> 8) & 0xff; | ||
// Write the op_code for the command | ||
_command[_index + 3] = (QueryCommand.OP_QUERY >> 24) & 0xff; | ||
_command[_index + 3] = (QueryCommand.OP_QUERY >> 24) & 0xff; | ||
_command[_index + 2] = (QueryCommand.OP_QUERY >> 16) & 0xff; | ||
@@ -120,3 +161,3 @@ _command[_index + 1] = (QueryCommand.OP_QUERY >> 8) & 0xff; | ||
// Write the query options | ||
_command[_index + 3] = (this.queryOptions >> 24) & 0xff; | ||
_command[_index + 3] = (this.queryOptions >> 24) & 0xff; | ||
_command[_index + 2] = (this.queryOptions >> 16) & 0xff; | ||
@@ -130,6 +171,6 @@ _command[_index + 1] = (this.queryOptions >> 8) & 0xff; | ||
_index = _index + _command.write(this.collectionName, _index, 'utf8') + 1; | ||
_command[_index - 1] = 0; | ||
_command[_index - 1] = 0; | ||
// Write the number of documents to skip | ||
_command[_index + 3] = (this.numberToSkip >> 24) & 0xff; | ||
_command[_index + 3] = (this.numberToSkip >> 24) & 0xff; | ||
_command[_index + 2] = (this.numberToSkip >> 16) & 0xff; | ||
@@ -142,3 +183,3 @@ _command[_index + 1] = (this.numberToSkip >> 8) & 0xff; | ||
// Write the number of documents to return | ||
_command[_index + 3] = (this.numberToReturn >> 24) & 0xff; | ||
_command[_index + 3] = (this.numberToReturn >> 24) & 0xff; | ||
_command[_index + 2] = (this.numberToReturn >> 16) & 0xff; | ||
@@ -165,3 +206,3 @@ _command[_index + 1] = (this.numberToReturn >> 8) & 0xff; | ||
// Write the length to the document | ||
_command[_index + 3] = (documentLength >> 24) & 0xff; | ||
_command[_index + 3] = (documentLength >> 24) & 0xff; | ||
_command[_index + 2] = (documentLength >> 16) & 0xff; | ||
@@ -173,3 +214,3 @@ _command[_index + 1] = (documentLength >> 8) & 0xff; | ||
// Add terminating 0 for the object | ||
_command[_index - 1] = 0; | ||
_command[_index - 1] = 0; | ||
@@ -181,3 +222,3 @@ // Push field selector if available | ||
// Write the length to the document | ||
_command[_index + 3] = (documentLength >> 24) & 0xff; | ||
_command[_index + 3] = (documentLength >> 24) & 0xff; | ||
_command[_index + 2] = (documentLength >> 16) & 0xff; | ||
@@ -189,3 +230,3 @@ _command[_index + 1] = (documentLength >> 8) & 0xff; | ||
// Add terminating 0 for the object | ||
_command[_index - 1] = 0; | ||
_command[_index - 1] = 0; | ||
} | ||
@@ -200,6 +241,6 @@ } if(this.returnFieldSelector != null && Buffer.isBuffer(this.returnFieldSelector)) { | ||
// Copy the data into the current buffer | ||
object.copy(_command, _index); | ||
object.copy(_command, _index); | ||
// Write the length to the document | ||
_command[_index + 3] = (documentLength >> 24) & 0xff; | ||
_command[_index + 3] = (documentLength >> 24) & 0xff; | ||
_command[_index + 2] = (documentLength >> 16) & 0xff; | ||
@@ -211,5 +252,5 @@ _command[_index + 1] = (documentLength >> 8) & 0xff; | ||
// Add terminating 0 for the object | ||
_command[_index - 1] = 0; | ||
_command[_index - 1] = 0; | ||
} | ||
// Return finished command | ||
@@ -216,0 +257,0 @@ return _command; |
@@ -12,4 +12,4 @@ var utils = require('./connection_utils'), | ||
// Set up event emitter | ||
EventEmitter.call(this); | ||
// Keep all options for the socket in a specific collection allowing the user to specify the | ||
EventEmitter.call(this); | ||
// Keep all options for the socket in a specific collection allowing the user to specify the | ||
// Wished upon socket connection parameters | ||
@@ -23,3 +23,3 @@ this.socketOptions = typeof socketOptions === 'object' ? socketOptions : {}; | ||
this.minPoolSize = Math.floor(this.poolSize / 2) + 1; | ||
// Set default settings for the socket options | ||
@@ -34,13 +34,13 @@ utils.setIntegerParameter(this.socketOptions, 'timeout', 0); | ||
// Allows you to set a throttling bufferSize if you need to stop overflows | ||
utils.setIntegerParameter(this.socketOptions, 'bufferSize', 0); | ||
utils.setIntegerParameter(this.socketOptions, 'bufferSize', 0); | ||
// Internal structures | ||
this.openConnections = []; | ||
this.openConnections = []; | ||
// Assign connection id's | ||
this.connectionId = 0; | ||
// Current index for selection of pool connection | ||
this.currentConnectionIndex = 0; | ||
// The pool state | ||
this._poolState = 'disconnected'; | ||
this._poolState = 'disconnected'; | ||
// timeout control | ||
@@ -55,7 +55,7 @@ this._timeout = false; | ||
maxBsonSize = Connection.DEFAULT_MAX_BSON_SIZE; | ||
} | ||
} | ||
for(var i = 0; i < this.openConnections.length; i++) { | ||
this.openConnections[i].maxBsonSize = maxBsonSize; | ||
} | ||
} | ||
} | ||
@@ -66,3 +66,2 @@ | ||
return new function() { | ||
var connectionStatus = _self._poolState; | ||
// Create a new connection instance | ||
@@ -97,10 +96,8 @@ var connection = new Connection(_self.connectionId++, _self.socketOptions); | ||
// If we are already disconnected ignore the event | ||
if(connectionStatus != 'disconnected' && _self.listeners("error").length > 0) { | ||
_self.emit("error", err); | ||
if(_self._poolState != 'disconnected' && _self.listeners("error").length > 0) { | ||
_self.emit("error", err); | ||
} | ||
// Set disconnected | ||
connectionStatus = 'disconnected'; | ||
// Set disconnected | ||
_self._poolState = 'disconnected'; | ||
_self._poolState = 'disconnected'; | ||
// Stop | ||
@@ -113,10 +110,7 @@ _self.stop(); | ||
// If we are already disconnected ignore the event | ||
if(connectionStatus !== 'disconnected' && _self.listeners("close").length > 0) { | ||
_self.emit("close"); | ||
if(_self._poolState !== 'disconnected' && _self.listeners("close").length > 0) { | ||
_self.emit("close"); | ||
} | ||
// Set disconnected | ||
connectionStatus = 'disconnected'; | ||
// Set disconnected | ||
_self._poolState = 'disconnected'; | ||
_self._poolState = 'disconnected'; | ||
// Stop | ||
@@ -129,10 +123,7 @@ _self.stop(); | ||
// If we are already disconnected ignore the event | ||
if(connectionStatus !== 'disconnected' && _self.listeners("timeout").length > 0) { | ||
_self.emit("timeout", err); | ||
if(_self._poolState !== 'disconnected' && _self.listeners("timeout").length > 0) { | ||
_self.emit("timeout", err); | ||
} | ||
// Set disconnected | ||
connectionStatus = 'disconnected'; | ||
// Set disconnected | ||
_self._poolState = 'disconnected'; | ||
_self._poolState = 'disconnected'; | ||
// Stop | ||
@@ -145,12 +136,10 @@ _self.stop(); | ||
// If we are already disconnected ignore the event | ||
if(connectionStatus !== 'disconnected' && _self.listeners("parseError").length > 0) { | ||
_self.emit("parseError", new Error("parseError occured")); | ||
if(_self._poolState !== 'disconnected' && _self.listeners("parseError").length > 0) { | ||
_self.emit("parseError", new Error("parseError occured")); | ||
} | ||
// Set disconnected | ||
connectionStatus = 'disconnected'; | ||
_self.stop(); | ||
}); | ||
connection.on("message", function(message) { | ||
connection.on("message", function(message) { | ||
_self.emit("message", message); | ||
@@ -160,3 +149,3 @@ }); | ||
// Start connection in the next tick | ||
connection.start(); | ||
connection.start(); | ||
}(); | ||
@@ -167,3 +156,3 @@ } | ||
// Start method, will throw error if no listeners are available | ||
// Pass in an instance of the listener that contains the api for | ||
// Pass in an instance of the listener that contains the api for | ||
// finding callbacks for a given message etc. | ||
@@ -177,5 +166,5 @@ ConnectionPool.prototype.start = function() { | ||
} | ||
// Set pool state to connecting | ||
this._poolState = 'connecting'; | ||
this._poolState = 'connecting'; | ||
this._timeout = false; | ||
@@ -198,7 +187,7 @@ | ||
// Set disconnected | ||
this._poolState = 'disconnected'; | ||
this._poolState = 'disconnected'; | ||
// Clear all listeners if specified | ||
if(removeListeners) { | ||
this.removeAllEventListeners(); | ||
this.removeAllEventListeners(); | ||
} | ||
@@ -210,5 +199,5 @@ | ||
} | ||
// Clean up | ||
this.openConnections = []; | ||
this.openConnections = []; | ||
} | ||
@@ -215,0 +204,0 @@ |
@@ -11,3 +11,3 @@ var utils = require('./connection_utils'), | ||
// Set up event emitter | ||
EventEmitter.call(this); | ||
EventEmitter.call(this); | ||
// Store all socket options | ||
@@ -19,7 +19,7 @@ this.socketOptions = socketOptions ? socketOptions : {host:'localhost', port:27017}; | ||
this.connected = false; | ||
// | ||
// Connection parsing state | ||
// | ||
this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : Connection.DEFAULT_MAX_BSON_SIZE; | ||
this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : Connection.DEFAULT_MAX_BSON_SIZE; | ||
// Contains the current message bytes | ||
@@ -51,5 +51,5 @@ this.buffer = null; | ||
// Create a new stream | ||
this.connection = new net.Socket(); | ||
// Set options on the socket | ||
this.connection.setTimeout(this.socketOptions.timeout); | ||
this.connection = new net.Socket(); | ||
// Set timeout allowing backward compatibility to timeout if no connectTimeoutMS is set | ||
this.connection.setTimeout(this.socketOptions.connectTimeoutMS != null ? this.socketOptions.connectTimeoutMS : this.socketOptions.timeout); | ||
// Work around for 0.4.X | ||
@@ -63,5 +63,5 @@ if(process.version.indexOf("v0.4") == -1) this.connection.setNoDelay(this.socketOptions.noDelay); | ||
this.connection.setKeepAlive(false); | ||
} | ||
} | ||
} | ||
// Set up pair for tls with server, accept self-signed certificates as well | ||
@@ -72,3 +72,3 @@ var pair = this.pair = tls.createSecurePair(false); | ||
this.connection.pipe(this.pair.encrypted); | ||
// Setup clearText stream | ||
@@ -92,3 +92,3 @@ this.writeSteam = this.pair.cleartext; | ||
// Set options on the socket | ||
this.connection.setTimeout(this.socketOptions.timeout); | ||
this.connection.setTimeout(this.socketOptions.connectTimeoutMS != null ? this.socketOptions.connectTimeoutMS : this.socketOptions.timeout); | ||
// Work around for 0.4.X | ||
@@ -102,3 +102,3 @@ if(process.version.indexOf("v0.4") == -1) this.connection.setNoDelay(this.socketOptions.noDelay); | ||
this.connection.setKeepAlive(false); | ||
} | ||
} | ||
} | ||
@@ -117,3 +117,3 @@ | ||
this.connection.on("close", closeHandler(this)); | ||
} | ||
} | ||
} | ||
@@ -133,3 +133,3 @@ | ||
var binaryCommand = command[i].toBinary() | ||
if(binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes")); | ||
if(!this.socketOptions['disableDriverBSONSizeCheck'] && binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes")); | ||
if(this.logger != null && this.logger.doDebug) this.logger.debug("writing command to mongodb", binaryCommand); | ||
@@ -140,8 +140,8 @@ var r = this.writeSteam.write(binaryCommand); | ||
var binaryCommand = command.toBinary() | ||
if(binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes")); | ||
if(!this.socketOptions['disableDriverBSONSizeCheck'] && binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes")); | ||
if(this.logger != null && this.logger.doDebug) this.logger.debug("writing command to mongodb", binaryCommand); | ||
var r = this.writeSteam.write(binaryCommand); | ||
} | ||
} catch (err) { | ||
if(typeof callback === 'function') callback(err); | ||
} | ||
} catch (err) { | ||
if(typeof callback === 'function') callback(err); | ||
} | ||
@@ -151,3 +151,3 @@ } | ||
// Force the closure of the connection | ||
Connection.prototype.close = function() { | ||
Connection.prototype.close = function() { | ||
// clear out all the listeners | ||
@@ -162,12 +162,12 @@ resetHandlers(this, true); | ||
// Reset all handlers | ||
var resetHandlers = function(self, clearListeners) { | ||
var resetHandlers = function(self, clearListeners) { | ||
self.eventHandlers = {error:[], connect:[], close:[], end:[], timeout:[], parseError:[], message:[]}; | ||
// If we want to clear all the listeners | ||
if(clearListeners && self.connection != null) { | ||
var keys = Object.keys(self.eventHandlers); | ||
var keys = Object.keys(self.eventHandlers); | ||
// Remove all listeners | ||
for(var i = 0; i < keys.length; i++) { | ||
self.connection.removeAllListeners(keys[i]); | ||
} | ||
} | ||
} | ||
@@ -182,5 +182,7 @@ } | ||
var connectHandler = function(self) { | ||
return function() { | ||
return function() { | ||
// Set connected | ||
self.connected = true; | ||
// Now that we are connected set the socket timeout | ||
self.connection.setTimeout(self.socketOptions.socketTimeoutMS != null ? self.socketOptions.socketTimeoutMS : self.socketOptions.timeout); | ||
// Emit the connect event with no error | ||
@@ -215,3 +217,3 @@ self.emit("connect", null, self); | ||
data = data.slice(remainingBytesToRead); | ||
// Emit current complete message | ||
@@ -226,6 +228,6 @@ try { | ||
// Emit the buffer | ||
self.emit("message", emitBuffer, self); | ||
self.emit("message", emitBuffer, self); | ||
} catch(err) { | ||
var errorObject = {err:"socketHandler", trace:err, bin:buffer, parseState:{ | ||
sizeOfMessage:self.sizeOfMessage, | ||
sizeOfMessage:self.sizeOfMessage, | ||
bytesRead:self.bytesRead, | ||
@@ -241,6 +243,6 @@ stubBuffer:self.stubBuffer}}; | ||
// size of the message (< 4 bytes) | ||
if(self.stubBuffer != null && self.stubBuffer.length > 0) { | ||
if(self.stubBuffer != null && self.stubBuffer.length > 0) { | ||
// If we have enough bytes to determine the message size let's do it | ||
if(self.stubBuffer.length + data.length > 4) { | ||
if(self.stubBuffer.length + data.length > 4) { | ||
// Prepad the data | ||
@@ -277,5 +279,5 @@ var newData = new Buffer(self.stubBuffer.length + data.length); | ||
var errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{ | ||
sizeOfMessage:sizeOfMessage, | ||
bytesRead:self.bytesRead, | ||
stubBuffer:self.stubBuffer}}; | ||
sizeOfMessage: sizeOfMessage, | ||
bytesRead: self.bytesRead, | ||
stubBuffer: self.stubBuffer}}; | ||
if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject); | ||
@@ -300,3 +302,3 @@ // We got a parse Error fire it off then keep going | ||
data = new Buffer(0); | ||
} else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage == data.length) { | ||
@@ -313,6 +315,6 @@ try { | ||
// Emit the message | ||
self.emit("message", emitBuffer, self); | ||
self.emit("message", emitBuffer, self); | ||
} catch (err) { | ||
var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{ | ||
sizeOfMessage:self.sizeOfMessage, | ||
sizeOfMessage:self.sizeOfMessage, | ||
bytesRead:self.bytesRead, | ||
@@ -323,8 +325,8 @@ stubBuffer:self.stubBuffer}}; | ||
self.emit("parseError", errorObject, self); | ||
} | ||
} | ||
} else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonSize) { | ||
var errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{ | ||
sizeOfMessage:sizeOfMessage, | ||
sizeOfMessage:sizeOfMessage, | ||
bytesRead:0, | ||
buffer:null, | ||
buffer:null, | ||
stubBuffer:null}}; | ||
@@ -335,3 +337,3 @@ if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject); | ||
// Clear out the state of the parser | ||
// Clear out the state of the parser | ||
self.buffer = null; | ||
@@ -353,3 +355,3 @@ self.sizeOfMessage = 0; | ||
// Copy rest of message | ||
data = data.slice(sizeOfMessage); | ||
data = data.slice(sizeOfMessage); | ||
// Emit the message | ||
@@ -359,3 +361,3 @@ self.emit("message", emitBuffer, self); | ||
var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{ | ||
sizeOfMessage:sizeOfMessage, | ||
sizeOfMessage:sizeOfMessage, | ||
bytesRead:self.bytesRead, | ||
@@ -368,3 +370,3 @@ stubBuffer:self.stubBuffer}}; | ||
} | ||
} | ||
} else { | ||
@@ -379,3 +381,3 @@ // Create a buffer that contains the space for the non-complete message | ||
} | ||
} | ||
} | ||
} | ||
@@ -390,7 +392,7 @@ } | ||
// Emit end event | ||
self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
} | ||
} | ||
var timeoutHandler = function(self) { | ||
var timeoutHandler = function(self) { | ||
return function() { | ||
@@ -406,3 +408,3 @@ self.emit("timeout", {err: 'connection to [' + self.socketOptions.host + ':' + self.socketOptions.port + '] timed out'}, self); | ||
var errorHandler = function(self) { | ||
var errorHandler = function(self) { | ||
return function(err) { | ||
@@ -417,5 +419,5 @@ // Set connected to false | ||
var closeHandler = function(self) { | ||
return function(hadError) { | ||
return function(hadError) { | ||
// If we have an error during the connection phase | ||
if(hadError && !self.connected) { | ||
if(hadError && !self.connected) { | ||
// Set disconnected | ||
@@ -429,3 +431,3 @@ self.connected = false; | ||
// Emit close | ||
self.emit("close", {err: 'connection closed to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
self.emit("close", {err: 'connection closed to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
} | ||
@@ -432,0 +434,0 @@ } |
var Connection = require('./connection').Connection, | ||
ReadPreference = require('./read_preference').ReadPreference, | ||
DbCommand = require('../commands/db_command').DbCommand, | ||
@@ -8,3 +9,3 @@ MongoReply = require('../responses/mongo_reply').MongoReply, | ||
inspect = require('util').inspect, | ||
Server = require('./server').Server, | ||
Server = require('./server').Server, | ||
PingStrategy = require('./strategies/ping_strategy').PingStrategy, | ||
@@ -28,16 +29,19 @@ StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy; | ||
* Options | ||
* - **ha** {Boolean, default:false}, turn on high availability. | ||
* - **ha** {Boolean, default:true}, turn on high availability. | ||
* - **haInterval** {Number, default:2000}, time between each replicaset status check. | ||
* - **reconnectWait** {Number, default:1000}, time to wait in miliseconds before attempting reconnect. | ||
* - **retries** {Number, default:30}, number of times to attempt a replicaset reconnect. | ||
* - **rs_name** {String}, the name of the replicaset to connect to. | ||
* - **readPreference** {String}, the prefered read preference (Server.READ_PRIMARY, Server.READ_SECONDARY, Server.READ_SECONDARY_ONLY). | ||
* - **read_secondary** {Boolean, deprecated}, allow reads from secondary. | ||
* - **rs_name** {String}, the name of the replicaset to connect to. | ||
* - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), connectTimeoutMS:(number), socketTimeoutMS:(number)) | ||
* - **readPreference** {String}, the prefered read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST). | ||
* - **strategy** {String, default:null}, selection strategy for reads choose between (ping and statistical, default is round-robin) | ||
* - **secondaryAcceptableLatencyMS** {Number, default:15}, sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms) | ||
* | ||
* @class Represents a Replicaset Configuration | ||
* @param {Array} list of server objects participating in the replicaset. | ||
* @param {Object} [options] additional options for the collection. | ||
* @param {Array} list of server objects participating in the replicaset. | ||
* @param {Object} [options] additional options for the replicaset connection. | ||
*/ | ||
var ReplSet = exports.ReplSet = function(servers, options) { | ||
this.count = 0; | ||
// Set up basic | ||
@@ -50,2 +54,8 @@ if(!(this instanceof ReplSet)) | ||
// Ensure no Mongos's | ||
for(var i = 0; i < servers.length; i++) { | ||
if(!(servers[i] instanceof Server)) throw new Error("list of servers must be of type Server"); | ||
} | ||
// Just reference for simplicity | ||
var self = this; | ||
@@ -62,6 +72,6 @@ // Contains the master server entry | ||
this.closedConnectionCount = 0; | ||
this._used = false; | ||
this._used = false; | ||
// Default poolSize for new server instances | ||
this.poolSize = this.options.poolSize == null ? 1 : this.options.poolSize; | ||
this.poolSize = this.options.poolSize == null ? 5 : this.options.poolSize; | ||
this._currentServerChoice = 0; | ||
@@ -82,27 +92,28 @@ | ||
this.recordQueryStats = false; | ||
// Get the readPreference | ||
var readPreference = this.options['readPreference']; | ||
// Read preference setting | ||
var readPreference = this.options['readPreference']; | ||
// Validate correctness of Read preferences | ||
if(readPreference != null) { | ||
if(readPreference != Server.READ_PRIMARY && readPreference != Server.READ_SECONDARY_ONLY | ||
&& readPreference != Server.READ_SECONDARY) { | ||
throw new Error("Illegal readPreference mode specified, " + readPreference); | ||
if(readPreference != ReadPreference.PRIMARY && readPreference != ReadPreference.PRIMARY_PREFERRED | ||
&& readPreference != ReadPreference.SECONDARY && readPreference != ReadPreference.SECONDARY_PREFERRED | ||
&& readPreference != ReadPreference.NEAREST && typeof readPreference != 'object' && readPreference['_type'] != 'ReadPreference') { | ||
throw new Error("Illegal readPreference mode specified, " + readPreference); | ||
} | ||
// Set read Preference | ||
this._readPreference = readPreference; | ||
} else { | ||
this._readPreference = null; | ||
this._readPreference = null; | ||
} | ||
// Strategy for picking a secondary | ||
// this.strategy = this.options['strategy'] == null ? 'statistical' : this.options['strategy']; | ||
this.secondaryAcceptableLatencyMS = this.options['secondaryAcceptableLatencyMS'] == null ? 15 : this.options['secondaryAcceptableLatencyMS']; | ||
this.strategy = this.options['strategy']; | ||
// Make sure strategy is one of the two allowed | ||
if(this.strategy != null && (this.strategy != 'ping' && this.strategy != 'statistical')) throw new Error("Only ping or statistical strategies allowed"); | ||
if(this.strategy != null && (this.strategy != 'ping' && this.strategy != 'statistical')) throw new Error("Only ping or statistical strategies allowed"); | ||
// Let's set up our strategy object for picking secodaries | ||
if(this.strategy == 'ping') { | ||
// Create a new instance | ||
this.strategyInstance = new PingStrategy(this); | ||
this.strategyInstance = new PingStrategy(this, this.secondaryAcceptableLatencyMS); | ||
} else if(this.strategy == 'statistical') { | ||
@@ -113,14 +124,14 @@ // Set strategy as statistical | ||
this.enableRecordQueryStats(true); | ||
} | ||
} | ||
// Set default connection pool options | ||
this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {}; | ||
this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {}; | ||
// Set up logger if any set | ||
this.logger = this.options.logger != null | ||
&& (typeof this.options.logger.debug == 'function') | ||
&& (typeof this.options.logger.error == 'function') | ||
&& (typeof this.options.logger.debug == 'function') | ||
this.logger = this.options.logger != null | ||
&& (typeof this.options.logger.debug == 'function') | ||
&& (typeof this.options.logger.error == 'function') | ||
&& (typeof this.options.logger.debug == 'function') | ||
? this.options.logger : {error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}}; | ||
// Ensure all the instances are of type server and auto_reconnect is false | ||
@@ -142,9 +153,16 @@ if(!Array.isArray(servers) || servers.length == 0) { | ||
} | ||
} | ||
} | ||
// Enabled ha | ||
this.haEnabled = this.options['ha'] == null ? false : this.options['ha']; | ||
this.haEnabled = this.options['ha'] == null ? true : this.options['ha']; | ||
// How often are we checking for new servers in the replicaset | ||
this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 2000 : this.options['haInterval']; | ||
this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 1000 : this.options['haInterval']; | ||
this._replicasetTimeoutId = null; | ||
// Connection timeout | ||
this._connectTimeoutMS = 1000; | ||
// Current list of servers to test | ||
this.pingCandidateServers = []; | ||
// Last replicaset check time | ||
this.lastReplicaSetTime = new Date().getTime(); | ||
}; | ||
@@ -165,3 +183,3 @@ | ||
// Ensure slaveOk is correct for secodnaries read preference and tags | ||
if((this._readPreference == Server.READ_SECONDARY || this._readPreference == Server.READ_SECONDARY_ONLY) | ||
if((this._readPreference == ReadPreference.SECONDARY_PREFERRED || this._readPreference == ReadPreference.SECONDARY) | ||
|| (this._readPreference != null && typeof this._readPreference == 'object')) { | ||
@@ -173,6 +191,6 @@ this.slaveOk = true; | ||
/** | ||
* Return the used state | ||
* @ignore | ||
*/ | ||
// Return the used state | ||
ReplSet.prototype._isUsed = function() { | ||
ReplSet.prototype._isUsed = function() { | ||
return this._used; | ||
@@ -184,5 +202,5 @@ } | ||
*/ | ||
ReplSet.prototype.setTarget = function(target) { | ||
this.target = target; | ||
}; | ||
ReplSet.prototype.isMongos = function() { | ||
return false; | ||
} | ||
@@ -200,3 +218,3 @@ /** | ||
*/ | ||
Server.prototype.isSetMember = function() { | ||
ReplSet.prototype.isSetMember = function() { | ||
return false; | ||
@@ -209,3 +227,3 @@ } | ||
ReplSet.prototype.isPrimary = function(config) { | ||
return this.readSecondary && this.secondaries.length > 0 ? false : true; | ||
return this.readSecondary && Object.keys(this._state.secondaries).length > 0 ? false : true; | ||
} | ||
@@ -220,52 +238,12 @@ | ||
* @ignore | ||
*/ | ||
// Clean up dead connections | ||
var cleanupConnections = ReplSet.cleanupConnections = function(connections, addresses, byTags) { | ||
// Ensure we don't have entries in our set with dead connections | ||
var keys = Object.keys(connections); | ||
for(var i = 0; i < keys.length; i++) { | ||
var server = connections[keys[i]]; | ||
// If it's not connected remove it from the list | ||
if(!server.isConnected()) { | ||
// Remove from connections and addresses | ||
delete connections[keys[i]]; | ||
delete addresses[keys[i]]; | ||
// Clean up tags if needed | ||
if(server.tags != null && typeof server.tags === 'object') { | ||
cleanupTags(server, byTags); | ||
} | ||
} | ||
} | ||
} | ||
/** | ||
* @ignore | ||
*/ | ||
var cleanupTags = ReplSet._cleanupTags = function(server, byTags) { | ||
var serverTagKeys = Object.keys(server.tags); | ||
// Iterate over all server tags and remove any instances for that tag that matches the current | ||
// server | ||
for(var i = 0; i < serverTagKeys.length; i++) { | ||
// Fetch the value for the tag key | ||
var value = server.tags[serverTagKeys[i]]; | ||
// If we got an instance of the server | ||
if(byTags[serverTagKeys[i]] != null | ||
&& byTags[serverTagKeys[i]][value] != null | ||
&& Array.isArray(byTags[serverTagKeys[i]][value])) { | ||
// List of clean servers | ||
var cleanInstances = []; | ||
// We got instances for the particular tag set | ||
var instances = byTags[serverTagKeys[i]][value]; | ||
for(var j = 0, jlen = instances.length; j < jlen; j++) { | ||
var serverInstance = instances[j]; | ||
// If we did not find an instance add it to the clean instances | ||
if((serverInstance.host + ":" + serverInstance.port) !== (server.host + ":" + server.port)) { | ||
cleanInstances.push(serverInstance); | ||
} | ||
} | ||
// Update the byTags list | ||
byTags[serverTagKeys[i]][value] = cleanInstances; | ||
} | ||
* @private | ||
**/ | ||
ReplSet.prototype._checkReplicaSet = function() { | ||
if(!this.haEnabled) return false; | ||
var currentTime = new Date().getTime(); | ||
if((currentTime - this.lastReplicaSetTime) >= this.replicasetStatusCheckInterval) { | ||
this.lastReplicaSetTime = currentTime; | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
@@ -310,3 +288,2 @@ } | ||
*/ | ||
// Ensure no callback is left hanging when we have an error | ||
var __executeAllCallbacksWithError = function(dbInstance, error) { | ||
@@ -325,7 +302,112 @@ var keys = Object.keys(dbInstance._callBackStore._notReplied); | ||
* @ignore | ||
* @private | ||
*/ | ||
ReplSet.prototype._validateReplicaset = function(result, auths) { | ||
var self = this; | ||
// For each member we need to check if we have a new connection that needs to be established | ||
var members = result['documents'][0]['members']; | ||
// Get members | ||
var members = Array.isArray(result['documents'][0]['members']) ? result['documents'][0]['members'] : []; | ||
// The total members we check | ||
var serversToConnectList = {}; | ||
// Iterate over all the members and see if we need to reconnect | ||
for(var i = 0, jlen = members.length; i < jlen; i++) { | ||
var member = members[i]; | ||
if(member['health'] != 0 | ||
&& null == self._state['addresses'][member['name']] | ||
&& null == serversToConnectList[member['name']]) { | ||
// Split the server string | ||
var parts = member.name.split(/:/); | ||
if(parts.length == 1) { | ||
parts = [parts[0], Connection.DEFAULT_PORT]; | ||
} | ||
// Default empty socket options object | ||
var socketOptions = {host:parts[0], port:parseInt(parts[1], 10)}; | ||
// If a socket option object exists clone it | ||
if(self.socketOptions != null) { | ||
var keys = Object.keys(self.socketOptions); | ||
for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = self.socketOptions[keys[i]]; | ||
} | ||
// Create a new server instance | ||
var newServer = new Server(parts[0], parseInt(parts[1], 10), {auto_reconnect:false, 'socketOptions':socketOptions | ||
, logger:self.logger, ssl:self.ssl, poolSize:self.poolSize}); | ||
// Set the replicaset instance | ||
newServer.replicasetInstance = self; | ||
// Add handlers | ||
newServer.on("close", self.closeHandler); | ||
newServer.on("timeout", self.timeoutHandler); | ||
newServer.on("error", self.errorHandler); | ||
// Add to list of server connection target | ||
serversToConnectList[member['name']] = newServer; | ||
} else if(member['stateStr'] == 'PRIMARY' && self._state.master['name'] != member['name']) { | ||
// Delete master record so we can rediscover it | ||
delete self._state['addresses'][self._state.master['name']]; | ||
// Update inormation on new primary | ||
var newMaster = self._state.addresses[member['name']]; | ||
newMaster.isMasterDoc.ismaster = true; | ||
newMaster.isMasterDoc.secondary = false; | ||
self._state.master = newMaster; | ||
// Remove from secondaries | ||
delete self._state.secondaries[member['name']]; | ||
newMaster = null; | ||
} | ||
} | ||
// All servers we want to connect to | ||
var serverKeys = Object.keys(serversToConnectList); | ||
// For all remaining servers on the list connect | ||
while(serverKeys.length > 0) { | ||
var _serverKey = serverKeys.pop(); | ||
// Fetch the server | ||
var _server = serversToConnectList[_serverKey]; | ||
// Add a new server to the total number of servers that need to initialized before we are done | ||
var newServerCallback = self.connectionHandler(_server); | ||
// Connect To the new server | ||
_server.connect(self.db, {returnIsMasterResults: true, eventReceiver:newServer}, function(err, result, _server) { | ||
if(err == null && result != null) { | ||
// Fetch the myState | ||
var document = result.documents[0]; | ||
// Remove from list until | ||
if(document.ismaster || document.secondary || document.arbiterOnly) { | ||
process.nextTick(function() { | ||
// Apply any auths | ||
if(Array.isArray(auths) && auths.length > 0) { | ||
// Get number of auths we need to execute | ||
var numberOfAuths = auths.length; | ||
// Apply all auths | ||
for(var i = 0; i < auths.length; i++) { | ||
self.db.authenticate(auths[i].username, auths[i].password, {'authdb':auths[i].authdb}, function(err, authenticated) { | ||
numberOfAuths = numberOfAuths - 1; | ||
// If we have no more authentications to replay | ||
if(numberOfAuths == 0) { | ||
newServerCallback(err, result, _server); | ||
} | ||
}); | ||
} | ||
} else { | ||
newServerCallback(err, result, _server); | ||
} | ||
}); | ||
} else { | ||
_server.close(); | ||
} | ||
} else { | ||
_server.close(); | ||
} | ||
}); | ||
} | ||
} | ||
/** | ||
* @ignore | ||
*/ | ||
ReplSet.prototype.connect = function(parent, options, callback) { | ||
var self = this; | ||
var dateStamp = new Date().getTime(); | ||
if('function' === typeof options) callback = options, options = {}; | ||
if('function' === typeof options) callback = options, options = {}; | ||
if(options == null) options = {}; | ||
@@ -347,8 +429,13 @@ if(!('function' === typeof callback)) callback = null; | ||
// Clean up state | ||
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]}; | ||
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{} | ||
, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[], 'members':[]}; | ||
// Create a connection handler | ||
// self.connectionHandler = null != self.connectionHandler ? self.connectionHandler : function(instanceServer) { | ||
self.connectionHandler = function(instanceServer) { | ||
return function(err, result) { | ||
self.count = self.count + 1; | ||
// If we found a master call it at the end | ||
var masterCallback = null; | ||
// Remove a server from the list of intialized servers we need to perform | ||
@@ -360,6 +447,6 @@ self._numberOfServersLeftToInitialize = self._numberOfServersLeftToInitialize - 1; | ||
} | ||
// Add enable query information | ||
instanceServer.enableRecordQueryStats(replSetSelf.recordQueryStats); | ||
if(err == null && result.documents[0].hosts != null) { | ||
@@ -379,9 +466,9 @@ // Fetch the isMaster command result | ||
var primary = document.primary; | ||
// Ensure we are keying on the same name for lookups as mongodb might return | ||
// dns name and the driver is using ip's | ||
// Ensure we are keying on the same name for lookups as mongodb might return | ||
// dns name and the driver is using ip's | ||
// Rename the connection so we are keying on the name used by mongod | ||
var userProvidedServerString = instanceServer.host + ":" + instanceServer.port; | ||
var userProvidedServerString = instanceServer.host + ":" + instanceServer.port; | ||
var me = document.me || userProvidedServerString; | ||
// If we have user provided entries already, switch them to avoid additional | ||
@@ -404,12 +491,12 @@ // open connections | ||
replSetSelf._state['arbiters'][me] = server; | ||
} | ||
} | ||
// Set name of the server | ||
server.name = me; | ||
// Add the existing one to the replicaset list of addresses | ||
replSetSelf._state['addresses'][me] = server; | ||
replSetSelf._state['addresses'][me] = server; | ||
} else { | ||
instanceServer.name = me; | ||
} | ||
// Only add server to our internal list if it's a master, secondary or arbiter | ||
@@ -427,2 +514,5 @@ if(isMaster == true || secondary == true || arbiterOnly == true) { | ||
__executeAllCallbacksWithError(parent, err); | ||
// Set the parent | ||
if(typeof parent.openCalled != 'undefined') | ||
parent.openCalled = false; | ||
// Ensure single callback only | ||
@@ -434,3 +524,3 @@ if(callback != null) { | ||
// Return the error | ||
internalCallback(err, null); | ||
internalCallback(err, null, replSetSelf); | ||
} else { | ||
@@ -444,10 +534,9 @@ // If the parent has listeners trigger an event | ||
} | ||
} | ||
} | ||
// Check if this is the primary server, then disconnect otherwise keep going | ||
if(replSetSelf._state.master != null) { | ||
var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port; | ||
// var errorServerAddress = server.host + ":" + server.port; | ||
var errorServerAddress = server.name; | ||
// Only shut down the set if we have a primary server error | ||
@@ -466,8 +555,8 @@ if(primaryAddress == errorServerAddress) { | ||
delete replSetSelf._state.passives[errorServerAddress]; | ||
} | ||
} | ||
// Check if we are reading from Secondary only | ||
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
if(replSetSelf._readPreference == ReadPreference.SECONDARY && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
closeServers(); | ||
} | ||
} | ||
} | ||
@@ -489,2 +578,5 @@ } else { | ||
__executeAllCallbacksWithError(parent, err); | ||
// Set the parent | ||
if(typeof parent.openCalled != 'undefined') | ||
parent.openCalled = false; | ||
// Ensure single callback only | ||
@@ -496,3 +588,3 @@ if(callback != null) { | ||
// Return the error | ||
internalCallback(new Error("connection timed out"), null); | ||
internalCallback(new Error("connection timed out"), null, replSetSelf); | ||
} else { | ||
@@ -506,4 +598,4 @@ // If the parent has listeners trigger an event | ||
} | ||
} | ||
} | ||
// Check if this is the primary server, then disconnect otherwise keep going | ||
@@ -513,3 +605,3 @@ if(replSetSelf._state.master != null) { | ||
var errorServerAddress = server.name; | ||
// Only shut down the set if we have a primary server error | ||
@@ -528,8 +620,8 @@ if(primaryAddress == errorServerAddress) { | ||
delete replSetSelf._state.passives[errorServerAddress]; | ||
} | ||
} | ||
// Check if we are reading from Secondary only | ||
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
if(replSetSelf._readPreference == ReadPreference.SECONDARY && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
closeServers(); | ||
} | ||
} | ||
} | ||
@@ -544,3 +636,3 @@ } else { | ||
var closeServers = function() { | ||
// Set the state to disconnected | ||
// Set the state to disconnected | ||
parent._state = 'disconnected'; | ||
@@ -552,2 +644,5 @@ // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply | ||
__executeAllCallbacksWithError(parent, err); | ||
// Set the parent | ||
if(typeof parent.openCalled != 'undefined') | ||
parent.openCalled = false; | ||
// Ensure single callback only | ||
@@ -559,3 +654,3 @@ if(callback != null) { | ||
// Return the error | ||
internalCallback(err, null); | ||
internalCallback(err, null, replSetSelf); | ||
} else { | ||
@@ -569,4 +664,4 @@ // If the parent has listeners trigger an event | ||
} | ||
} | ||
} | ||
// Check if this is the primary server, then disconnect otherwise keep going | ||
@@ -576,4 +671,2 @@ if(replSetSelf._state.master != null) { | ||
var errorServerAddress = server.name; | ||
// var errorServerAddress = server.host + ":" + server.port; | ||
// Only shut down the set if we have a primary server error | ||
@@ -592,8 +685,8 @@ if(primaryAddress == errorServerAddress) { | ||
delete replSetSelf._state.passives[errorServerAddress]; | ||
} | ||
} | ||
// Check if we are reading from Secondary only | ||
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
if(replSetSelf._readPreference == ReadPreference.SECONDARY && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
closeServers(); | ||
} | ||
} | ||
} | ||
@@ -604,3 +697,3 @@ } else { | ||
} | ||
// Ensure we don't have duplicate handlers | ||
@@ -620,29 +713,2 @@ instanceServer.removeAllListeners("close"); | ||
// For each tag in tags let's add the instance Server to the list for that tag | ||
if(tags != null && typeof tags === 'object') { | ||
var tagKeys = Object.keys(tags); | ||
// For each tag file in the server add it to byTags | ||
for(var i = 0; i < tagKeys.length; i++) { | ||
var value = tags[tagKeys[i]]; | ||
// Check if we have a top level tag object | ||
if(replSetSelf._state.byTags[tagKeys[i]] == null) replSetSelf._state.byTags[tagKeys[i]] = {}; | ||
// For the value check if we have an array of server instances | ||
if(!Array.isArray(replSetSelf._state.byTags[tagKeys[i]][value])) replSetSelf._state.byTags[tagKeys[i]][value] = []; | ||
// Check that the instance is not already registered there | ||
var valueArray = replSetSelf._state.byTags[tagKeys[i]][value]; | ||
var found = false; | ||
// Iterate over all values | ||
for(var j = 0; j < valueArray.length; j++) { | ||
if(valueArray[j].host == instanceServer.host && valueArray[j].port == instanceServer.port) { | ||
found = true; | ||
break; | ||
} | ||
} | ||
// If it was not found push the instance server to the list | ||
if(!found) valueArray.push(instanceServer); | ||
} | ||
} | ||
// Remove from error list | ||
@@ -656,3 +722,3 @@ delete replSetSelf._state.errors[me]; | ||
if(replSetSelf.replicaSet == null) { | ||
replSetSelf._state.setName = setName; | ||
replSetSelf._state.setName = setName; | ||
} else if(replSetSelf.replicaSet != setName && replSetSelf._serverState != 'disconnected') { | ||
@@ -666,5 +732,5 @@ replSetSelf._state.errorMessages.push(new Error("configured mongodb replicaset does not match provided replicaset [" + setName + "] != [" + replSetSelf.replicaSet + "]")); | ||
// Return error message ignoring rest of calls | ||
return internalCallback(replSetSelf._state.errorMessages[0], parent); | ||
return internalCallback(replSetSelf._state.errorMessages[0], parent, replSetSelf); | ||
} | ||
// Let's add the server to our list of server types | ||
@@ -679,8 +745,12 @@ if(secondary == true && (passive == false || passive == null)) { | ||
replSetSelf._state.master = instanceServer; | ||
masterCallback = callback; | ||
callback = null; | ||
} else if(isMaster == false && primary != null && replSetSelf._state.addresses[primary]) { | ||
replSetSelf._state.master = replSetSelf._state.addresses[primary]; | ||
masterCallback = callback; | ||
callback = null; | ||
} | ||
// Let's go throught all the "possible" servers in the replicaset | ||
var candidateServers = hosts.concat(arbiters).concat(passives); | ||
var candidateServers = hosts.concat(arbiters).concat(passives); | ||
@@ -690,6 +760,6 @@ // If we have new servers let's add them | ||
// Fetch the server string | ||
var candidateServerString = candidateServers[i]; | ||
var candidateServerString = candidateServers[i]; | ||
// Add the server if it's not defined and not already errored out | ||
if(null == replSetSelf._state.addresses[candidateServerString] | ||
&& null == replSetSelf._state.errors[candidateServerString]) { | ||
&& null == replSetSelf._state.errors[candidateServerString]) { | ||
// Split the server string | ||
@@ -711,9 +781,11 @@ var parts = candidateServerString.split(/:/); | ||
socketOptions['host'] = parts[0]; | ||
socketOptions['port'] = parseInt(parts[1]); | ||
socketOptions['port'] = parseInt(parts[1], 10); | ||
// Set fast connect timeout | ||
socketOptions['connectTimeoutMS'] = replSetSelf._connectTimeoutMS | ||
// Create a new server instance | ||
var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions | ||
var newServer = new Server(parts[0], parseInt(parts[1], 10), {auto_reconnect:false, 'socketOptions':socketOptions | ||
, logger:replSetSelf.logger, ssl:replSetSelf.ssl, poolSize:replSetSelf.poolSize}); | ||
// Set the replicaset instance | ||
newServer.replicasetInstance = replSetSelf; | ||
newServer.replicasetInstance = replSetSelf; | ||
@@ -724,3 +796,3 @@ // Add handlers | ||
newServer.on("error", replSetSelf.errorHandler); | ||
// Add server to list, ensuring we don't get a cascade of request to the same server | ||
@@ -735,3 +807,3 @@ replSetSelf._state.addresses[candidateServerString] = newServer; | ||
} | ||
} | ||
} | ||
} else { | ||
@@ -742,5 +814,23 @@ // Remove the instance from out list of servers | ||
} else { | ||
instanceServer.close(); | ||
delete replSetSelf._state.addresses[instanceServer.host + ":" + instanceServer.port]; | ||
} | ||
// Check if we are ready in the next tick to allow more connections to be done | ||
// process.nextTick(function() { | ||
// Call back as we have a master letting the rest of the connections happen async | ||
if(masterCallback != null) { | ||
var internalCallback = masterCallback; | ||
masterCallback = null; | ||
// Fire open event | ||
process.nextTick(function() { | ||
// Emit the open event | ||
parent.emit("open", null, parent); | ||
}); | ||
internalCallback(null, parent, replSetSelf); | ||
} | ||
// }); | ||
// If done finish up | ||
@@ -762,8 +852,15 @@ if((self._numberOfServersLeftToInitialize == 0) && replSetSelf._serverState === 'connecting' && replSetSelf._state.errorMessages.length == 0) { | ||
callback = null; | ||
// Start up ha | ||
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) { | ||
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval); | ||
// Fire open event | ||
process.nextTick(function() { | ||
// Emit on db parent | ||
parent.emit("fullsetup", null, parent); | ||
// Emit all servers done | ||
replSetSelf.emit("fullsetup", null, parent); | ||
}); | ||
// Callback | ||
if(typeof internalCallback == 'function') { | ||
internalCallback(null, parent, replSetSelf); | ||
} | ||
// Perform callback | ||
internalCallback(null, parent); | ||
}) | ||
@@ -774,8 +871,14 @@ } else { | ||
callback = null; | ||
// Start up ha | ||
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) { | ||
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval); | ||
// Fire open event | ||
process.nextTick(function() { | ||
parent.emit("fullsetup", null, parent); | ||
// Emit all servers done | ||
replSetSelf.emit("fullsetup", null, parent); | ||
}); | ||
// Callback | ||
if(typeof internalCallback == 'function') { | ||
internalCallback(null, parent, replSetSelf); | ||
} | ||
// Perform callback | ||
internalCallback(null, parent); | ||
} | ||
@@ -792,8 +895,14 @@ } else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length > 0) { | ||
callback = null; | ||
// Start up ha | ||
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) { | ||
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval); | ||
// Fire open event | ||
process.nextTick(function() { | ||
parent.emit("fullsetup", null, parent); | ||
// Emit all servers done | ||
replSetSelf.emit("fullsetup", null, parent) | ||
}); | ||
// Callback | ||
if(typeof internalCallback == 'function') { | ||
internalCallback(null, parent, replSetSelf); | ||
} | ||
// Perform callback | ||
internalCallback(null, parent); | ||
}) | ||
@@ -804,10 +913,16 @@ } else { | ||
callback = null; | ||
// Start up ha | ||
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) { | ||
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval); | ||
// Fire open event | ||
process.nextTick(function() { | ||
parent.emit("fullsetup", null, parent); | ||
// Emit all servers done | ||
replSetSelf.emit("fullsetup", null, parent); | ||
}); | ||
// Callback | ||
if(typeof internalCallback == 'function') { | ||
internalCallback(null, parent, replSetSelf); | ||
} | ||
// Perform callback | ||
internalCallback(null, parent); | ||
} | ||
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) { | ||
replSetSelf._serverState = 'disconnected'; | ||
@@ -820,3 +935,3 @@ // ensure no callbacks get called twice | ||
// Perform callback | ||
internalCallback(new Error("no secondary server found"), null); | ||
internalCallback(new Error("no secondary server found"), null, replSetSelf); | ||
} else if(typeof callback === 'function') { | ||
@@ -830,4 +945,4 @@ replSetSelf._serverState = 'disconnected'; | ||
// Perform callback | ||
internalCallback(new Error("no primary server found"), null); | ||
} | ||
internalCallback(new Error("no primary server found"), null, replSetSelf); | ||
} | ||
} else if((self._numberOfServersLeftToInitialize == 0) && replSetSelf._state.errorMessages.length > 0 && replSetSelf._serverState != 'disconnected') { | ||
@@ -842,7 +957,7 @@ // Set done | ||
// Callback to signal we are done | ||
internalCallback(replSetSelf._state.errorMessages[0], null); | ||
internalCallback(replSetSelf._state.errorMessages[0], null, replSetSelf); | ||
} | ||
} | ||
} | ||
// Ensure we have all registered servers in our set | ||
@@ -854,3 +969,3 @@ for(var i = 0; i < serverConnections.length; i++) { | ||
// Initialize all the connections | ||
for(var i = 0; i < serverConnections.length; i++) { | ||
for(var i = 0; i < serverConnections.length; i++) { | ||
// Set up the logger for the server connection | ||
@@ -872,2 +987,4 @@ serverConnections[i].logger = replSetSelf.logger; | ||
socketOptions['port'] = serverConnections[i].port; | ||
// Set fast connect timeout | ||
socketOptions['connectTimeoutMS'] = replSetSelf._connectTimeoutMS | ||
@@ -880,91 +997,3 @@ // Set the socket options | ||
serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, self.connectionHandler(serverConnections[i])); | ||
} | ||
// The checking function | ||
this.replicasetCheckFunction = function() { | ||
try { | ||
// Retrieve a reader connection | ||
var con = self.checkoutReader(); | ||
// If we have a connection and we have a db object | ||
if(con != null && Array.isArray(self.dbInstances) && self.dbInstances.length > 0) { | ||
var dbInstance = self.dbInstances[0]; | ||
dbInstance.admin().command({replSetGetStatus:1}, {connection:con}, function(err, result) { | ||
// Paranoid android | ||
if(null == err && null != result && null != result["documents"] && result["documents"].length > 0) { | ||
// For each member we need to check if we have a new connection that needs to be established | ||
var members = result['documents'][0]['members']; | ||
if(null != members) { | ||
// The total members we check | ||
var newServers = 0; | ||
// Iterate over all existing members | ||
for(var i = 0, jlen = members.length; i < jlen; i++) { | ||
// Get a member | ||
var member = members[i]; | ||
// If the node is healthy and it does not exist in the current replicaset, add it to the | ||
// current setup | ||
if(null != self._state && 0 != member['health'] && null == self._state['addresses'][member['name']]) { | ||
// We need to add a server to the connection, this means going through the notions of establishing | ||
// A completely new connection | ||
// Found a new server | ||
newServers = newServers + 1; | ||
// Split the server string | ||
var parts = member.name.split(/:/); | ||
if(parts.length == 1) { | ||
parts = [parts[0], Connection.DEFAULT_PORT]; | ||
} | ||
// Default empty socket options object | ||
var socketOptions = {}; | ||
// If a socket option object exists clone it | ||
if(self.socketOptions != null) { | ||
var keys = Object.keys(self.socketOptions); | ||
for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = self.socketOptions[keys[i]]; | ||
} | ||
// Add host information to socket options | ||
socketOptions['host'] = parts[0]; | ||
socketOptions['port'] = parseInt(parts[1]); | ||
// Create a new server instance | ||
var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions | ||
, logger:self.logger, ssl:self.ssl, poolSize:self.poolSize}); | ||
// Set the replicaset instance | ||
newServer.replicasetInstance = self; | ||
// Add handlers | ||
newServer.on("close", self.closeHandler); | ||
newServer.on("timeout", self.timeoutHandler); | ||
newServer.on("error", self.errorHandler); | ||
// Add a new server to the total number of servers that need to initialized before we are done | ||
var newServerCallback = self.connectionHandler(newServer); | ||
// Let's set up a new server instance | ||
newServer.connect(self.db, {returnIsMasterResults: true, eventReceiver:newServer}, function(err, result) { | ||
// Remove from number of newServers | ||
newServers = newServers - 1; | ||
// Call the setup | ||
newServerCallback(err, result); | ||
// If we have 0 new servers let's go back to rechecking | ||
if(newServers <= 0) { | ||
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval); | ||
} | ||
}); | ||
} | ||
} | ||
// If we have no new servers check status again | ||
if(newServers == 0) { | ||
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
} catch(err) { | ||
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval); | ||
} | ||
}; | ||
} | ||
} | ||
@@ -977,3 +1006,3 @@ | ||
// Establish connection | ||
var connection = this._state.master != null ? this._state.master.checkoutWriter() : null; | ||
var connection = this._state.master != null ? this._state.master.checkoutWriter() : null; | ||
// Return the connection | ||
@@ -986,10 +1015,119 @@ return connection; | ||
*/ | ||
ReplSet.prototype.checkoutReader = function() { | ||
var pickFirstConnectedSecondary = function pickFirstConnectedSecondary(self, tags) { | ||
var keys = Object.keys(self._state.secondaries); | ||
var connection = null; | ||
// Find first available reader if any | ||
for(var i = 0; i < keys.length; i++) { | ||
connection = self._state.secondaries[keys[i]].checkoutReader(); | ||
if(connection != null) break; | ||
} | ||
// If we still have a null, read from primary if it's not secondary only | ||
if(self._readPreference == ReadPreference.SECONDARY_PREFERRED) { | ||
connection = self._state.master.checkoutReader(); | ||
} | ||
if(connection == null) { | ||
var preferenceName = self._readPreference == ReadPreference.SECONDARY_PREFERRED ? 'secondary' : self._readPreference; | ||
return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags)); | ||
} | ||
// Return the connection | ||
return connection; | ||
} | ||
/** | ||
* @ignore | ||
*/ | ||
var _pickFromTags = function(self, tags) { | ||
// If we have an array or single tag selection | ||
var tagObjects = Array.isArray(tags) ? tags : [tags]; | ||
// Iterate over all tags until we find a candidate server | ||
for(var _i = 0; _i < tagObjects.length; _i++) { | ||
// Grab a tag object | ||
var tagObject = tagObjects[_i]; | ||
// Matching keys | ||
var matchingKeys = Object.keys(tagObject); | ||
// Match all the servers that match the provdided tags | ||
var keys = Object.keys(self._state.secondaries); | ||
var candidateServers = []; | ||
for(var i = 0; i < keys.length; i++) { | ||
var server = self._state.secondaries[keys[i]]; | ||
// If we have tags match | ||
if(server.tags != null) { | ||
var matching = true; | ||
// Ensure we have all the values | ||
for(var j = 0; j < matchingKeys.length; j++) { | ||
if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) { | ||
matching = false; | ||
break; | ||
} | ||
} | ||
// If we have a match add it to the list of matching servers | ||
if(matching) { | ||
candidateServers.push(server); | ||
} | ||
} | ||
} | ||
// If we have a candidate server return | ||
if(candidateServers.length > 0) { | ||
if(this.strategyInstance) return this.strategyInstance.checkoutSecondary(tags, candidateServers); | ||
// Set instance to return | ||
return candidateServers[Math.floor(Math.random() * candidateServers.length)].checkoutReader(); | ||
} | ||
} | ||
// No connection found | ||
return null; | ||
} | ||
/** | ||
* @ignore | ||
*/ | ||
ReplSet.prototype.checkoutReader = function(readPreference, tags) { | ||
var connection = null; | ||
// If we have a read preference object unpack it | ||
if(typeof readPreference == 'object' && readPreference['_type'] == 'ReadPreference') { | ||
tags = readPreference.tags; | ||
readPreference = readPreference.mode; | ||
} else if(typeof readPreference == 'object' && readPreference['_type'] != 'ReadPreference') { | ||
throw new Error("read preferences must be either a string or an instance of ReadPreference"); | ||
} | ||
// Set up our read Preference, allowing us to override the readPreference | ||
var finalReadPreference = readPreference != null ? readPreference : this._readPreference; | ||
finalReadPreference = finalReadPreference == true ? ReadPreference.SECONDARY_PREFERRED : finalReadPreference; | ||
// If we are reading from a primary | ||
if(finalReadPreference == 'primary') { | ||
// If we provide a tags set send an error | ||
if(typeof tags == 'object' && tags != null) { | ||
return new Error("PRIMARY cannot be combined with tags"); | ||
} | ||
// If we provide a tags set send an error | ||
if(this._state.master == null) { | ||
return new Error("No replica set primary available for query with ReadPreference PRIMARY"); | ||
} | ||
// Checkout a writer | ||
return this.checkoutWriter(); | ||
} | ||
// If we have specified to read from a secondary server grab a random one and read | ||
// from it, otherwise just pass the primary connection | ||
if((this.readSecondary == true || this._readPreference == Server.READ_SECONDARY || this._readPreference == Server.READ_SECONDARY_ONLY) && Object.keys(this._state.secondaries).length > 0) { | ||
// Checkout a secondary server from the passed in set of servers | ||
if(this.strategyInstance != null) { | ||
connection = this.strategyInstance.checkoutSecondary(); | ||
if((this.readSecondary || finalReadPreference == ReadPreference.SECONDARY_PREFERRED || finalReadPreference == ReadPreference.SECONDARY) && Object.keys(this._state.secondaries).length > 0) { | ||
// If we have tags, look for servers matching the specific tag | ||
if(tags != null && typeof tags == 'object') { | ||
// Get connection | ||
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) { | ||
// No candidate servers that match the tags, error | ||
if(connection == null) { | ||
return new Error("No replica set members available for query"); | ||
} | ||
} else { | ||
@@ -1000,35 +1138,70 @@ // Pick a random key | ||
var key = keys[this._currentServerChoice++]; | ||
connection = this._state.secondaries[key].checkoutReader(); | ||
// Fetch a connectio | ||
connection = this._state.secondaries[key] != null ? this._state.secondaries[key].checkoutReader() : null; | ||
// If connection is null fallback to first available secondary | ||
connection = connection == null ? pickFirstConnectedSecondary(this, tags) : connection; | ||
} | ||
} else if(this._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(this._state.secondaries).length == 0) { | ||
connection = null; | ||
} else if(this._readPreference != null && typeof this._readPreference === 'object') { | ||
// Get all tag keys (used to try to find a server that is valid) | ||
var keys = Object.keys(this._readPreference); | ||
// final instance server | ||
var instanceServer = null; | ||
// for each key look for an avilable instance | ||
for(var i = 0; i < keys.length; i++) { | ||
// Grab subkey value | ||
var value = this._readPreference[keys[i]]; | ||
// Check if we have any servers for the tag, if we do pick a random one | ||
if(this._state.byTags[keys[i]] != null | ||
&& this._state.byTags[keys[i]][value] != null | ||
&& Array.isArray(this._state.byTags[keys[i]][value]) | ||
&& this._state.byTags[keys[i]][value].length > 0) { | ||
// Let's grab an available server from the list using a random pick | ||
var serverInstances = this._state.byTags[keys[i]][value]; | ||
// Set instance to return | ||
instanceServer = serverInstances[Math.floor(Math.random() * serverInstances.length)]; | ||
break; | ||
} else if(finalReadPreference == ReadPreference.PRIMARY_PREFERRED) { | ||
// Check if there is a primary available and return that if possible | ||
connection = this.checkoutWriter(); | ||
// If no connection available checkout a secondary | ||
if(connection == null) { | ||
// If we have tags, look for servers matching the specific tag | ||
if(tags != null && typeof tags == 'object') { | ||
// Get connection | ||
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) { | ||
// No candidate servers that match the tags, error | ||
if(connection == null) { | ||
return new Error("No replica set members available for query"); | ||
} | ||
} else { | ||
// Pick a random key | ||
var keys = Object.keys(this._state.secondaries); | ||
this._currentServerChoice = this._currentServerChoice % keys.length; | ||
var key = keys[this._currentServerChoice++]; | ||
// Fetch a connectio | ||
connection = this._state.secondaries[key] != null ? this._state.secondaries[key].checkoutReader() : null; | ||
// If connection is null fallback to first available secondary | ||
connection = connection == null ? pickFirstConnectedSecondary(this, tags) : connection; | ||
} | ||
} | ||
// Return the instance of the server | ||
connection = instanceServer != null ? instanceServer.checkoutReader() : this.checkoutWriter(); | ||
} else if(finalReadPreference == ReadPreference.SECONDARY_PREFERRED && tags == null && Object.keys(this._state.secondaries).length == 0) { | ||
connection = this.checkoutWriter(); | ||
// If no connection return an error | ||
if(connection == null) { | ||
var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference; | ||
connection = new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags)); | ||
} | ||
} else if(finalReadPreference == ReadPreference.SECONDARY_PREFERRED) { | ||
// If we have tags, look for servers matching the specific tag | ||
if(tags != null && typeof tags == 'object') { | ||
// Get connection | ||
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) { | ||
// No candidate servers that match the tags, error | ||
if(connection == null) { | ||
// No secondary server avilable, attemp to checkout a primary server | ||
connection = this.checkoutWriter(); | ||
// If no connection return an error | ||
if(connection == null) { | ||
return new Error("No replica set members available for query"); | ||
} | ||
} | ||
} else if(this.strategyInstance != null) { | ||
connection = this.strategyInstance.checkoutReader(tags); | ||
} | ||
} else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance != null) { | ||
connection = this.strategyInstance.checkoutSecondary(tags); | ||
} else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance == null) { | ||
return new Error("A strategy for calculating nearness must be enabled such as ping or statistical"); | ||
} else if(finalReadPreference == ReadPreference.SECONDARY && Object.keys(this._state.secondaries).length == 0) { | ||
if(tags != null && typeof tags == 'object') { | ||
var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference; | ||
connection = new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags)); | ||
} else { | ||
connection = new Error("No replica set secondary available for query with ReadPreference SECONDARY"); | ||
} | ||
} else { | ||
connection = this.checkoutWriter(); | ||
} | ||
// Return the connection | ||
@@ -1048,3 +1221,3 @@ return connection; | ||
allConnections = allConnections.concat(allMasterConnections); | ||
// If we have read secondary let's add all secondary servers | ||
@@ -1062,3 +1235,3 @@ if(this.readSecondary && Object.keys(this._state.secondaries).length > 0) { | ||
} | ||
// Return all the conections | ||
@@ -1074,3 +1247,3 @@ return allConnections; | ||
this.recordQueryStats = enable; | ||
// Ensure all existing servers already have the flag set, even if the | ||
// Ensure all existing servers already have the flag set, even if the | ||
// connections are up already or we have not connected yet | ||
@@ -1101,5 +1274,5 @@ if(this._state != null && this._state.addresses != null) { | ||
ReplSet.prototype.close = function(callback) { | ||
var self = this; | ||
var self = this; | ||
// Set server status as disconnected | ||
this._serverState = 'disconnected'; | ||
this._serverState = 'disconnected'; | ||
// Get all the server instances and close them | ||
@@ -1112,5 +1285,6 @@ var allServers = []; | ||
allServers.push(this._state.addresses[keys[i]]); | ||
} | ||
} | ||
} | ||
// Let's process all the closing | ||
@@ -1128,24 +1302,17 @@ var numberOfServersToClose = allServers.length; | ||
var server = allServers[i]; | ||
if(server.isConnected()) { | ||
// Close each server | ||
server.close(function() { | ||
numberOfServersToClose = numberOfServersToClose - 1; | ||
// Clear out state if we are done | ||
if(numberOfServersToClose == 0) { | ||
// Clear out state | ||
self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]}; | ||
} | ||
// Close each server | ||
server.close(function() { | ||
numberOfServersToClose = numberOfServersToClose - 1; | ||
// Clear out state if we are done | ||
if(numberOfServersToClose == 0) { | ||
// Clear out state | ||
self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{} | ||
, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[], 'members':[]}; | ||
} | ||
// If we are finished perform the call back | ||
if(numberOfServersToClose == 0 && typeof callback === 'function') { | ||
callback(null); | ||
} | ||
}) | ||
} else { | ||
numberOfServersToClose = numberOfServersToClose - 1; | ||
// If we have no more servers perform the callback | ||
// If we are finished perform the call back | ||
if(numberOfServersToClose == 0 && typeof callback === 'function') { | ||
callback(null); | ||
callback(null); | ||
} | ||
} | ||
}) | ||
} | ||
@@ -1171,5 +1338,5 @@ } | ||
if(this._readPreference == null && this.readSecondary) { | ||
return Server.READ_SECONDARY; | ||
return ReadPreference.SECONDARY_PREFERRED; | ||
} else if(this._readPreference == null && !this.readSecondary) { | ||
return Server.READ_PRIMARY; | ||
return ReadPreference.PRIMARY; | ||
} else { | ||
@@ -1179,3 +1346,3 @@ return this._readPreference; | ||
} | ||
}); | ||
}); | ||
@@ -1189,3 +1356,3 @@ /** | ||
var servers = this.allServerInstances(); | ||
return servers[0].dbInstances; | ||
return servers.length > 0 ? servers[0].dbInstances : []; | ||
} | ||
@@ -1229,3 +1396,3 @@ }) | ||
Object.defineProperty(ReplSet.prototype, "secondaries", {enumerable: true | ||
, get: function() { | ||
, get: function() { | ||
var keys = Object.keys(this._state.secondaries); | ||
@@ -1246,3 +1413,3 @@ var array = new Array(keys.length); | ||
Object.defineProperty(ReplSet.prototype, "allSecondaries", {enumerable: true | ||
, get: function() { | ||
, get: function() { | ||
return this.secondaries.concat(this.passives); | ||
@@ -1292,3 +1459,3 @@ } | ||
} | ||
}); | ||
}); | ||
@@ -1295,0 +1462,0 @@ /** |
var Connection = require('./connection').Connection, | ||
ReadPreference = require('./read_preference').ReadPreference, | ||
DbCommand = require('../commands/db_command').DbCommand, | ||
@@ -12,9 +13,10 @@ MongoReply = require('../responses/mongo_reply').MongoReply, | ||
* Options | ||
* - **readPreference** {String, default:null}, set's the read preference (Server.READ_PRIMAR, Server.READ_SECONDARY_ONLY, Server.READ_SECONDARY) | ||
* - **readPreference** {String, default:null}, set's the read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST) | ||
* - **ssl** {Boolean, default:false}, use ssl connection (needs to have a mongod server with ssl support) | ||
* - **slaveOk** {Boolean, default:false}, legacy option allowing reads from secondary, use **readPrefrence** instead. | ||
* - **poolSize** {Number, default:1}, number of connections in the connection pool, set to 1 as default for legacy reasons. | ||
* - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), timeout:(number)) | ||
* - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), connectTimeoutMS:(number), socketTimeoutMS:(number)) | ||
* - **logger** {Object, default:null}, an object representing a logger that you want to use, needs to support functions debug, log, error **({error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}})**. | ||
* - **auto_reconnect** {Boolean, default:false}, reconnect on error. | ||
* - **disableDriverBSONSizeCheck** {Boolean, default:false}, force the server to error if the BSON message is to big | ||
* | ||
@@ -28,6 +30,6 @@ * @class Represents a Server connection. | ||
// Set up event emitter | ||
EventEmitter.call(this); | ||
EventEmitter.call(this); | ||
// Set up Server instance | ||
if(!(this instanceof Server)) return new Server(host, port, options); | ||
var self = this; | ||
@@ -40,22 +42,23 @@ this.host = host; | ||
this.connected = false; | ||
this.poolSize = this.options.poolSize == null ? 1 : this.options.poolSize; | ||
this.poolSize = this.options.poolSize == null ? 5 : this.options.poolSize; | ||
this.disableDriverBSONSizeCheck = this.options.disableDriverBSONSizeCheck != null ? this.options.disableDriverBSONSizeCheck : false; | ||
this.ssl = this.options.ssl == null ? false : this.options.ssl; | ||
this.slaveOk = this.options["slave_ok"]; | ||
this._used = false; | ||
// Get the readPreference | ||
var readPreference = this.options['readPreference']; | ||
var readPreference = this.options['readPreference']; | ||
// Read preference setting | ||
if(readPreference != null) { | ||
if(readPreference != Server.READ_PRIMARY && readPreference != Server.READ_SECONDARY_ONLY | ||
&& readPreference != Server.READ_SECONDARY) { | ||
if(readPreference != ReadPreference.PRIMARY && readPreference != ReadPreference.SECONDARY | ||
&& readPreference != ReadPreference.SECONDARY_PREFERRED) { | ||
throw new Error("Illegal readPreference mode specified, " + readPreference); | ||
} | ||
// Set read Preference | ||
this._readPreference = readPreference; | ||
} else { | ||
this._readPreference = null; | ||
this._readPreference = null; | ||
} | ||
// Contains the isMaster information returned from the server | ||
@@ -66,2 +69,3 @@ this.isMasterDoc; | ||
this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {}; | ||
if(this.disableDriverBSONSizeCheck) this.socketOptions.disableDriverBSONSizeCheck = this.disableDriverBSONSizeCheck; | ||
// Set ssl up if it's defined | ||
@@ -71,8 +75,8 @@ if(this.ssl) { | ||
} | ||
// Set up logger if any set | ||
this.logger = this.options.logger != null | ||
&& (typeof this.options.logger.debug == 'function') | ||
&& (typeof this.options.logger.error == 'function') | ||
&& (typeof this.options.logger.log == 'function') | ||
this.logger = this.options.logger != null | ||
&& (typeof this.options.logger.debug == 'function') | ||
&& (typeof this.options.logger.error == 'function') | ||
&& (typeof this.options.logger.log == 'function') | ||
? this.options.logger : {error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}}; | ||
@@ -86,5 +90,5 @@ | ||
// Contains state information about server connection | ||
this._state = {'runtimeStats': {'queryStats':new RunningStats()}}; | ||
this._state = {'runtimeStats': {'queryStats':new RunningStats()}}; | ||
// Do we record server stats or not | ||
this.recordQueryStats = false; | ||
this.recordQueryStats = false; | ||
}; | ||
@@ -97,7 +101,14 @@ | ||
inherits(Server, EventEmitter); | ||
// Read Preferences | ||
Server.READ_PRIMARY = 'primary'; | ||
Server.READ_SECONDARY = 'secondary'; | ||
Server.READ_SECONDARY_ONLY = 'secondaryOnly'; | ||
// Always ourselves | ||
// | ||
// Deprecated, USE ReadPreferences class | ||
// | ||
Server.READ_PRIMARY = ReadPreference.PRIMARY; | ||
Server.READ_SECONDARY = ReadPreference.SECONDARY_PREFERRED; | ||
Server.READ_SECONDARY_ONLY = ReadPreference.SECONDARY; | ||
/** | ||
* Always ourselves | ||
* @ignore | ||
*/ | ||
Server.prototype.setReadPreference = function() {} | ||
@@ -108,3 +119,10 @@ | ||
*/ | ||
Server.prototype._isUsed = function() { | ||
Server.prototype.isMongos = function() { | ||
return this.isMasterDoc != null && this.isMasterDoc['msg'] == "isdbgrid" ? true : false; | ||
} | ||
/** | ||
* @ignore | ||
*/ | ||
Server.prototype._isUsed = function() { | ||
return this._used; | ||
@@ -116,3 +134,3 @@ } | ||
*/ | ||
Server.prototype.close = function(callback) { | ||
Server.prototype.close = function(callback) { | ||
// Remove all local listeners | ||
@@ -129,3 +147,3 @@ this.removeAllListeners(); | ||
// Set server status as disconnected | ||
this._serverState = 'disconnected'; | ||
this._serverState = 'disconnected'; | ||
// Peform callback if present | ||
@@ -153,3 +171,3 @@ if(typeof callback === 'function') callback(); | ||
Server.prototype.isSetMember = function() { | ||
return this['replicasetInstance'] != null; | ||
return this['replicasetInstance'] != null || this['mongosInstance'] != null; | ||
} | ||
@@ -161,6 +179,6 @@ | ||
Server.prototype.connect = function(dbInstance, options, callback) { | ||
if('function' === typeof options) callback = options, options = {}; | ||
if('function' === typeof options) callback = options, options = {}; | ||
if(options == null) options = {}; | ||
if(!('function' === typeof callback)) callback = null; | ||
// Currently needed to work around problems with multiple connections in a pool with ssl | ||
@@ -170,9 +188,9 @@ // TODO fix if possible | ||
// Set up socket options for ssl | ||
this.socketOptions.ssl = true; | ||
this.socketOptions.ssl = true; | ||
} | ||
// Let's connect | ||
var server = this; | ||
// Let's us override the main receiver of events | ||
var eventReceiver = options.eventReceiver != null ? options.eventReceiver : this; | ||
var eventReceiver = options.eventReceiver != null ? options.eventReceiver : this; | ||
// Creating dbInstance | ||
@@ -183,2 +201,5 @@ this.dbInstance = dbInstance; | ||
// Force connection pool if there is one | ||
if(server.connectionPool) server.connectionPool.stop(); | ||
// Set server state to connecting | ||
@@ -192,11 +213,11 @@ this._serverState = 'connecting'; | ||
connectionPool.logger = this.logger; | ||
// Set up a new pool using default settings | ||
server.connectionPool = connectionPool; | ||
// Set basic parameters passed in | ||
var returnIsMasterResults = options.returnIsMasterResults == null ? false : options.returnIsMasterResults; | ||
// Create a default connect handler, overriden when using replicasets | ||
var connectCallback = function(err, reply) { | ||
var connectCallback = function(err, reply) { | ||
// ensure no callbacks get called twice | ||
@@ -207,3 +228,3 @@ var internalCallback = callback; | ||
// proxy killed connection etc, ignore the erorr as close event was isssued | ||
if(err != null && internalCallback == null) return; | ||
if(err != null && internalCallback == null) return; | ||
// Internal callback | ||
@@ -217,18 +238,21 @@ if(err != null) return internalCallback(err, null); | ||
server.isMasterDoc = reply.documents[0]; | ||
// Emit open event | ||
_emitAcrossAllDbInstances(server, eventReceiver, "open", null, returnIsMasterResults ? reply : dbInstance, null); | ||
// If we have it set to returnIsMasterResults | ||
if(returnIsMasterResults) { | ||
internalCallback(null, reply); | ||
internalCallback(null, reply, server); | ||
} else { | ||
internalCallback(null, dbInstance); | ||
internalCallback(null, dbInstance, server); | ||
} | ||
}; | ||
// Let's us override the main connect callback | ||
var connectHandler = options.connectHandler == null ? connectCallback : options.connectHandler; | ||
var connectHandler = options.connectHandler == null ? connectCallback : options.connectHandler; | ||
// Set up on connect method | ||
connectionPool.on("poolReady", function() { | ||
connectionPool.on("poolReady", function() { | ||
// Create db command and Add the callback to the list of callbacks by the request id (mapping outgoing messages to correct callbacks) | ||
var db_command = DbCommand.NcreateIsMasterCommand(dbInstance, dbInstance.databaseName); | ||
var db_command = DbCommand.NcreateIsMasterCommand(dbInstance, dbInstance.databaseName); | ||
// Check out a reader from the pool | ||
@@ -241,3 +265,3 @@ var connection = connectionPool.checkoutConnection(); | ||
dbInstance._registerHandler(db_command, false, connection, connectHandler); | ||
// Write the command out | ||
@@ -254,3 +278,3 @@ connection.write(db_command); | ||
// Parse the header | ||
mongoReply.parseHeader(message, connectionPool.bson) | ||
mongoReply.parseHeader(message, connectionPool.bson) | ||
// If message size is not the same as the buffer size | ||
@@ -260,12 +284,12 @@ // something went terribly wrong somewhere | ||
// Emit the error | ||
if(eventReceiver.listeners["error"] && eventReceiver.listeners["error"].length > 0) eventReceiver.emit("error", new Error("bson length is different from message length"), server); | ||
if(eventReceiver.listeners("error") && eventReceiver.listeners("error").length > 0) eventReceiver.emit("error", new Error("bson length is different from message length"), server); | ||
// Remove all listeners | ||
server.removeAllListeners(); | ||
} else { | ||
} else { | ||
var startDate = new Date().getTime(); | ||
// Callback instance | ||
var callbackInfo = null; | ||
var dbInstanceObject = null; | ||
// Locate a callback instance and remove any additional ones | ||
@@ -283,3 +307,3 @@ for(var i = 0; i < server.dbInstances.length; i++) { | ||
} | ||
// Only execute callback if we have a caller | ||
@@ -293,3 +317,3 @@ if(callbackInfo.callback && Array.isArray(callbackInfo.info.chained)) { | ||
} | ||
// If we have already fired then clean up rest of chain and move on | ||
@@ -300,9 +324,43 @@ if(numberOfFoundCallbacks != chained.length) { | ||
} | ||
// Just return from function | ||
return; | ||
} | ||
// Parse the body | ||
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw, function(err) { | ||
if(err != null) { | ||
// If pool connection is already closed | ||
if(server._serverState === 'disconnected') return; | ||
// Set server state to disconnected | ||
server._serverState = 'disconnected'; | ||
// Remove all listeners and close the connection pool | ||
server.removeAllListeners(); | ||
connectionPool.stop(true); | ||
// If we have a callback return the error | ||
if(typeof callback === 'function') { | ||
// ensure no callbacks get called twice | ||
var internalCallback = callback; | ||
callback = null; | ||
// Perform callback | ||
internalCallback(new Error("connection closed due to parseError"), null, server); | ||
} else if(server.isSetMember()) { | ||
if(server.listeners("parseError") && server.listeners("parseError").length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server); | ||
} else { | ||
if(eventReceiver.listeners("parseError") && eventReceiver.listeners("parseError").length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server); | ||
} | ||
// If we are a single server connection fire errors correctly | ||
if(!server.isSetMember()) { | ||
// Fire all callback errors | ||
_fireCallbackErrors(server, new Error("connection closed due to parseError")); | ||
// Emit error | ||
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server, null, true); | ||
} | ||
// Short cut | ||
return; | ||
} | ||
// Fetch the callback | ||
var callbackInfo = dbInstanceObject._findHandler(mongoReply.responseTo.toString()); | ||
@@ -319,3 +377,3 @@ // If we have an error let's execute the callback and clean up all other | ||
var chainedIds = callbackInfo.info.chained; | ||
if(chainedIds.length > 0 && chainedIds[chainedIds.length - 1] == mongoReply.responseTo) { | ||
@@ -325,3 +383,3 @@ // Cleanup all other chained calls | ||
// Remove listeners | ||
for(var i = 0; i < chainedIds.length; i++) dbInstanceObject._removeHandler(chainedIds[i]); | ||
for(var i = 0; i < chainedIds.length; i++) dbInstanceObject._removeHandler(chainedIds[i]); | ||
// Call the handler | ||
@@ -333,3 +391,3 @@ dbInstanceObject._callHandler(mongoReply.responseTo, callbackInfo.info.results.shift(), null); | ||
var handler = dbInstanceObject._findHandler(chainedIds[i]); | ||
// Check if we have an object, if it's the case take the current object commands and | ||
// Check if we have an object, if it's the case take the current object commands and | ||
// and add this one | ||
@@ -340,21 +398,54 @@ if(handler.info != null) { | ||
} | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
}); | ||
} else if(callbackInfo.callback) { | ||
// Parse the body | ||
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw, function(err) { | ||
if(err != null) { | ||
// If pool connection is already closed | ||
if(server._serverState === 'disconnected') return; | ||
// Set server state to disconnected | ||
server._serverState = 'disconnected'; | ||
// Remove all listeners and close the connection pool | ||
server.removeAllListeners(); | ||
connectionPool.stop(true); | ||
// If we have a callback return the error | ||
if(typeof callback === 'function') { | ||
// ensure no callbacks get called twice | ||
var internalCallback = callback; | ||
callback = null; | ||
// Perform callback | ||
internalCallback(new Error("connection closed due to parseError"), null, server); | ||
} else if(server.isSetMember()) { | ||
if(server.listeners("parseError") && server.listeners("parseError").length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server); | ||
} else { | ||
if(eventReceiver.listeners("parseError") && eventReceiver.listeners("parseError").length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server); | ||
} | ||
// If we are a single server connection fire errors correctly | ||
if(!server.isSetMember()) { | ||
// Fire all callback errors | ||
_fireCallbackErrors(server, new Error("connection closed due to parseError")); | ||
// Emit error | ||
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server, null, true); | ||
} | ||
// Short cut | ||
return; | ||
} | ||
// Let's record the stats info if it's enabled | ||
if(server.recordQueryStats == true && server._state['runtimeStats'] != null | ||
if(server.recordQueryStats == true && server._state['runtimeStats'] != null | ||
&& server._state.runtimeStats['queryStats'] instanceof RunningStats) { | ||
// Add data point to the running statistics object | ||
server._state.runtimeStats.queryStats.push(new Date().getTime() - callbackInfo.info.start); | ||
} | ||
} | ||
// Trigger the callback | ||
dbInstanceObject._callHandler(mongoReply.responseTo, mongoReply, null); | ||
}); | ||
} | ||
} | ||
}); | ||
} | ||
} | ||
} catch (err) { | ||
@@ -365,5 +456,5 @@ // Throw error in next tick | ||
}) | ||
} | ||
} | ||
}); | ||
// Handle timeout | ||
@@ -375,4 +466,2 @@ connectionPool.on("timeout", function(err) { | ||
server._serverState = 'disconnected'; | ||
// // Close the pool | ||
// connectionPool.stop(); | ||
// If we have a callback return the error | ||
@@ -384,20 +473,20 @@ if(typeof callback === 'function') { | ||
// Perform callback | ||
internalCallback(err, null); | ||
internalCallback(err, null, server); | ||
} else if(server.isSetMember()) { | ||
if(server.listeners["timeout"] && server.listeners["timeout"].length > 0) server.emit("timeout", err, server); | ||
if(server.listeners("timeout") && server.listeners("timeout").length > 0) server.emit("timeout", err, server); | ||
} else { | ||
if(eventReceiver.listeners["timeout"] && eventReceiver.listeners["timeout"].length > 0) eventReceiver.emit("timeout", err, server); | ||
if(eventReceiver.listeners("timeout") && eventReceiver.listeners("timeout").length > 0) eventReceiver.emit("timeout", err, server); | ||
} | ||
// If we are a single server connection fire errors correctly | ||
if(!server.isSetMember()) { | ||
// Fire all callback errors | ||
_fireCallbackErrors(server, err); | ||
_fireCallbackErrors(server, err); | ||
// Emit error | ||
_emitAcrossAllDbInstances(server, eventReceiver, "timeout", err, server); | ||
_emitAcrossAllDbInstances(server, eventReceiver, "timeout", err, server, true); | ||
} | ||
}); | ||
// Handle errors | ||
connectionPool.on("error", function(message) { | ||
connectionPool.on("error", function(message) { | ||
// If pool connection is already closed | ||
@@ -408,3 +497,3 @@ if(server._serverState === 'disconnected') return; | ||
// If we have a callback return the error | ||
if(typeof callback === 'function') {// && !server.isSetMember()) { | ||
if(typeof callback === 'function') { | ||
// ensure no callbacks get called twice | ||
@@ -414,18 +503,18 @@ var internalCallback = callback; | ||
// Perform callback | ||
internalCallback(new Error(message && message.err ? message.err : message), null); | ||
internalCallback(new Error(message && message.err ? message.err : message), null, server); | ||
} else if(server.isSetMember()) { | ||
if(server.listeners["error"] && server.listeners["error"].length > 0) server.emit("error", new Error(message && message.err ? message.err : message), server); | ||
if(server.listeners("error") && server.listeners("error").length > 0) server.emit("error", new Error(message && message.err ? message.err : message), server); | ||
} else { | ||
if(eventReceiver.listeners["error"] && eventReceiver.listeners["error"].length > 0) eventReceiver.emit("error", new Error(message && message.err ? message.err : message), server); | ||
if(eventReceiver.listeners("error") && eventReceiver.listeners("error").length > 0) eventReceiver.emit("error", new Error(message && message.err ? message.err : message), server); | ||
} | ||
// If we are a single server connection fire errors correctly | ||
if(!server.isSetMember()) { | ||
// Fire all callback errors | ||
_fireCallbackErrors(server, new Error(message && message.err ? message.err : message)); | ||
_fireCallbackErrors(server, new Error(message && message.err ? message.err : message)); | ||
// Emit error | ||
_emitAcrossAllDbInstances(server, eventReceiver, "error", new Error(message && message.err ? message.err : message), server); | ||
_emitAcrossAllDbInstances(server, eventReceiver, "error", new Error(message && message.err ? message.err : message), server, true); | ||
} | ||
}); | ||
// Handle close events | ||
@@ -437,4 +526,2 @@ connectionPool.on("close", function() { | ||
server._serverState = 'disconnected'; | ||
// // Close the pool | ||
// connectionPool.stop(true); | ||
// If we have a callback return the error | ||
@@ -446,15 +533,15 @@ if(typeof callback == 'function') { | ||
// Perform callback | ||
internalCallback(new Error("connection closed"), null); | ||
internalCallback(new Error("connection closed"), null, server); | ||
} else if(server.isSetMember()) { | ||
if(server.listeners["close"] && server.listeners["close"].length > 0) server.emit("close", new Error("connection closed"), server); | ||
if(server.listeners("close") && server.listeners("close").length > 0) server.emit("close", new Error("connection closed"), server); | ||
} else { | ||
if(eventReceiver.listeners["close"] && eventReceiver.listeners["close"].length > 0) eventReceiver.emit("close", new Error("connection closed"), server); | ||
if(eventReceiver.listeners("close") && eventReceiver.listeners("close").length > 0) eventReceiver.emit("close", new Error("connection closed"), server); | ||
} | ||
// If we are a single server connection fire errors correctly | ||
if(!server.isSetMember()) { | ||
// Fire all callback errors | ||
_fireCallbackErrors(server, new Error("connection closed")); | ||
_fireCallbackErrors(server, new Error("connection closed")); | ||
// Emit error | ||
_emitAcrossAllDbInstances(server, eventReceiver, "close", server); | ||
_emitAcrossAllDbInstances(server, eventReceiver, "close", server, null, true); | ||
} | ||
@@ -465,3 +552,3 @@ }); | ||
// error | ||
connectionPool.on("parseError", function(message) { | ||
connectionPool.on("parseError", function(message) { | ||
// If pool connection is already closed | ||
@@ -479,18 +566,18 @@ if(server._serverState === 'disconnected') return; | ||
// Perform callback | ||
internalCallback(new Error("connection closed due to parseError"), null); | ||
internalCallback(new Error("connection closed due to parseError"), null, server); | ||
} else if(server.isSetMember()) { | ||
if(server.listeners["parseError"] && server.listeners["parseError"].length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server); | ||
if(server.listeners("parseError") && server.listeners("parseError").length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server); | ||
} else { | ||
if(eventReceiver.listeners["parseError"] && eventReceiver.listeners["parseError"].length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server); | ||
if(eventReceiver.listeners("parseError") && eventReceiver.listeners("parseError").length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server); | ||
} | ||
// If we are a single server connection fire errors correctly | ||
if(!server.isSetMember()) { | ||
// Fire all callback errors | ||
_fireCallbackErrors(server, new Error("connection closed due to parseError")); | ||
_fireCallbackErrors(server, new Error("connection closed due to parseError")); | ||
// Emit error | ||
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server); | ||
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server, null, true); | ||
} | ||
}); | ||
// Boot up connection poole, pass in a locator of callbacks | ||
@@ -518,8 +605,7 @@ connectionPool.start(); | ||
// Only callback once and the last one is the right one | ||
var finalCallback = chained.pop(); | ||
// console.dir(finalCallback) | ||
var finalCallback = chained.pop(); | ||
if(info.connection.socketOptions.host === server.host && info.connection.socketOptions.port === server.port) { | ||
dbInstance._callBackStore.emit(finalCallback, err, null); | ||
} | ||
} | ||
// Put back the final callback to ensure we don't call all commands in the chain | ||
@@ -535,6 +621,6 @@ chained.push(finalCallback); | ||
dbInstance._callBackStore.emit(keys[j], err, null); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
@@ -545,3 +631,3 @@ | ||
*/ | ||
var _emitAcrossAllDbInstances = function(server, filterDb, event, message, object) { | ||
var _emitAcrossAllDbInstances = function(server, filterDb, event, message, object, resetConnection) { | ||
// Emit close event across all db instances sharing the sockets | ||
@@ -552,8 +638,13 @@ var allServerInstances = server.allServerInstances(); | ||
// For all db instances signal all db instances | ||
if(Array.isArray(serverInstance.dbInstances) && serverInstance.dbInstances.length > 1) { | ||
if(Array.isArray(serverInstance.dbInstances) && serverInstance.dbInstances.length >= 1) { | ||
for(var i = 0; i < serverInstance.dbInstances.length; i++) { | ||
var dbInstance = serverInstance.dbInstances[i]; | ||
// Set the parent | ||
if(resetConnection && typeof dbInstance.openCalled != 'undefined') | ||
dbInstance.openCalled = false; | ||
// Check if it's our current db instance and skip if it is | ||
if(filterDb == null || filterDb.databaseName !== dbInstance.databaseName || filterDb.tag !== dbInstance.tag) { | ||
dbInstance.emit(event, message, object); | ||
// Only emit if there is a listener | ||
if(dbInstance.listeners(event).length > 0) | ||
dbInstance.emit(event, message, object); | ||
} | ||
@@ -569,3 +660,3 @@ } | ||
return this.connectionPool.getAllConnections(); | ||
} | ||
} | ||
@@ -576,3 +667,3 @@ /** | ||
*/ | ||
var canCheckoutWriter = function(self, read) { | ||
var canCheckoutWriter = function(self, read) { | ||
// We cannot write to an arbiter or secondary server | ||
@@ -583,6 +674,6 @@ if(self.isMasterDoc['arbiterOnly'] == true) { | ||
return new Error("Cannot write to a secondary"); | ||
} else if(read == true && self._readPreference == Server.READ_SECONDARY_ONLY && self.isMasterDoc['ismaster'] == true) { | ||
} else if(read == true && self._readPreference == ReadPreference.SECONDARY && self.isMasterDoc['ismaster'] == true) { | ||
return new Error("Cannot read from primary when secondary only specified"); | ||
} | ||
// Return no error | ||
@@ -596,11 +687,13 @@ return null; | ||
Server.prototype.checkoutWriter = function(read) { | ||
if(read == true) return this.connectionPool.checkoutConnection(); | ||
if(read == true) return this.connectionPool.checkoutConnection(); | ||
// Check if are allowed to do a checkout (if we try to use an arbiter f.ex) | ||
var result = canCheckoutWriter(this, read); | ||
// If the result is null check out a writer | ||
if(result == null) { | ||
return this.connectionPool.checkoutConnection(); | ||
if(result == null && this.connectionPool != null) { | ||
return this.connectionPool.checkoutConnection(); | ||
} else if(result == null) { | ||
return null; | ||
} else { | ||
return result; | ||
} | ||
} | ||
} | ||
@@ -612,15 +705,15 @@ | ||
*/ | ||
var canCheckoutReader = function(self) { | ||
var canCheckoutReader = function(self) { | ||
// We cannot write to an arbiter or secondary server | ||
if(self.isMasterDoc['arbiterOnly'] == true) { | ||
if(self.isMasterDoc && self.isMasterDoc['arbiterOnly'] == true) { | ||
return new Error("Cannot write to an arbiter"); | ||
} else if(self._readPreference != null) { | ||
// If the read preference is Primary and the instance is not a master return an error | ||
if(self._readPreference == Server.READ_PRIMARY && self.isMasterDoc['ismaster'] != true) { | ||
return new Error("Read preference is " + Server.READ_PRIMARY + " and server is not master"); | ||
} else if(self._readPreference == Server.READ_SECONDARY_ONLY && self.isMasterDoc['ismaster'] == true) { | ||
return new Error("Cannot read from primary when secondary only specified"); | ||
if((self._readPreference == ReadPreference.PRIMARY) && self.isMasterDoc['ismaster'] != true) { | ||
return new Error("Read preference is Server.PRIMARY and server is not master"); | ||
} else if(self._readPreference == ReadPreference.SECONDARY && self.isMasterDoc['ismaster'] == true) { | ||
return new Error("Cannot read from primary when secondary only specified"); | ||
} | ||
} | ||
// Return no error | ||
@@ -637,7 +730,9 @@ return null; | ||
// If the result is null check out a writer | ||
if(result == null) { | ||
return this.connectionPool.checkoutConnection(); | ||
if(result == null && this.connectionPool != null) { | ||
return this.connectionPool.checkoutConnection(); | ||
} else if(result == null) { | ||
return null; | ||
} else { | ||
return result; | ||
} | ||
} | ||
} | ||
@@ -653,3 +748,3 @@ | ||
/** | ||
* Internal statistics object used for calculating average and standard devitation on | ||
* Internal statistics object used for calculating average and standard devitation on | ||
* running queries | ||
@@ -664,3 +759,3 @@ * @ignore | ||
this.m_newM = 0.0; | ||
this.m_newS = 0.0; | ||
this.m_newS = 0.0; | ||
@@ -685,6 +780,6 @@ // Define getters | ||
Object.defineProperty(this, "sScore", { enumerable: true | ||
, get: function () { | ||
, get: function () { | ||
var bottom = this.mean + this.standardDeviation; | ||
if(bottom == 0) return 0; | ||
return ((2 * this.mean * this.standardDeviation)/(bottom)); | ||
return ((2 * this.mean * this.standardDeviation)/(bottom)); | ||
} | ||
@@ -709,4 +804,4 @@ }); | ||
// set up for next iteration | ||
this.m_oldM = this.m_newM; | ||
this.m_oldS = this.m_newS; | ||
this.m_oldM = this.m_newM; | ||
this.m_oldS = this.m_newS; | ||
} | ||
@@ -722,3 +817,3 @@ } | ||
} | ||
}); | ||
}); | ||
@@ -735,3 +830,3 @@ /** | ||
} | ||
}); | ||
}); | ||
@@ -748,3 +843,3 @@ /** | ||
} | ||
}); | ||
}); | ||
@@ -758,3 +853,3 @@ /** | ||
} | ||
}); | ||
}); | ||
@@ -769,3 +864,3 @@ /** | ||
} | ||
}); | ||
}); | ||
@@ -779,3 +874,3 @@ /** | ||
} | ||
}); | ||
}); | ||
@@ -796,3 +891,3 @@ /** | ||
} | ||
}); | ||
}); | ||
@@ -799,0 +894,0 @@ /** |
@@ -6,4 +6,5 @@ var Server = require("../server").Server; | ||
// return time for the db command {ping:true} | ||
var PingStrategy = exports.PingStrategy = function(replicaset) { | ||
var PingStrategy = exports.PingStrategy = function(replicaset, secondaryAcceptableLatencyMS) { | ||
this.replicaset = replicaset; | ||
this.secondaryAcceptableLatencyMS = secondaryAcceptableLatencyMS; | ||
this.state = 'disconnected'; | ||
@@ -18,7 +19,7 @@ // Class instance | ||
// Start ping server | ||
this._pingServer(callback); | ||
this._pingServer(callback); | ||
} | ||
// Stops and kills any processes running | ||
PingStrategy.prototype.stop = function(callback) { | ||
PingStrategy.prototype.stop = function(callback) { | ||
// Stop the ping process | ||
@@ -30,30 +31,77 @@ this.state = 'disconnected'; | ||
PingStrategy.prototype.checkoutSecondary = function() { | ||
// Get all secondary server keys | ||
var keys = Object.keys(this.replicaset._state.secondaries); | ||
// Contains the picked instance | ||
var minimumPingMs = null; | ||
var selectedInstance = null; | ||
// Pick server key by the lowest ping time | ||
for(var i = 0; i < keys.length; i++) { | ||
// Fetch a server | ||
var server = this.replicaset._state.secondaries[keys[i]]; | ||
// If we don't have a ping time use it | ||
if(server.runtimeStats['pingMs'] == null) { | ||
// Set to 0 ms for the start | ||
server.runtimeStats['pingMs'] = 0; | ||
// Pick server | ||
selectedInstance = server; | ||
break; | ||
} else { | ||
// If the next server's ping time is less than the current one choose than one | ||
if(minimumPingMs == null || server.runtimeStats['pingMs'] < minimumPingMs) { | ||
minimumPingMs = server.runtimeStats['pingMs']; | ||
selectedInstance = server; | ||
PingStrategy.prototype.checkoutSecondary = function(tags, secondaryCandidates) { | ||
// Servers are picked based on the lowest ping time and then servers that lower than that + secondaryAcceptableLatencyMS | ||
// Create a list of candidat servers, containing the primary if available | ||
var candidateServers = []; | ||
// If we have not provided a list of candidate servers use the default setup | ||
if(!Array.isArray(secondaryCandidates)) { | ||
candidateServers = this.replicaset._state.master != null ? [this.replicaset._state.master] : []; | ||
// Add all the secondaries | ||
var keys = Object.keys(this.replicaset._state.secondaries); | ||
for(var i = 0; i < keys.length; i++) { | ||
candidateServers.push(this.replicaset._state.secondaries[keys[i]]) | ||
} | ||
} else { | ||
candidateServers = secondaryCandidates; | ||
} | ||
// Final list of eligable server | ||
var finalCandidates = []; | ||
// If we have tags filter by tags | ||
if(tags != null && typeof tags == 'object') { | ||
// If we have an array or single tag selection | ||
var tagObjects = Array.isArray(tags) ? tags : [tags]; | ||
// Iterate over all tags until we find a candidate server | ||
for(var _i = 0; _i < tagObjects.length; _i++) { | ||
// Grab a tag object | ||
var tagObject = tagObjects[_i]; | ||
// Matching keys | ||
var matchingKeys = Object.keys(tagObject); | ||
// Remove any that are not tagged correctly | ||
for(var i = 0; i < candidateServers.length; i++) { | ||
var server = candidateServers[i]; | ||
// If we have tags match | ||
if(server.tags != null) { | ||
var matching = true; | ||
// Ensure we have all the values | ||
for(var j = 0; j < matchingKeys.length; j++) { | ||
if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) { | ||
matching = false; | ||
break; | ||
} | ||
} | ||
// If we have a match add it to the list of matching servers | ||
if(matching) { | ||
finalCandidates.push(server); | ||
} | ||
} | ||
} | ||
} | ||
} else { | ||
// Final array candidates | ||
var finalCandidates = candidateServers; | ||
} | ||
// Return the selected instance | ||
return selectedInstance != null ? selectedInstance.checkoutReader() : null; | ||
// Sort by ping time | ||
finalCandidates.sort(function(a, b) { | ||
return a.runtimeStats['pingMs'] > b.runtimeStats['pingMs']; | ||
}); | ||
// Cut off the array of anything slower than the [0].pingMs + secondaryAcceptableLatencyMS | ||
for(var i = 0; i < finalCandidates.length; i++) { | ||
if(finalCandidates[i].runtimeStats['pingMs'] > finalCandidates[0].runtimeStats['pingMs'] + this.secondaryAcceptableLatencyMS) { | ||
// Slice out the array and break | ||
finalCandidates = finalCandidates.slice(i); | ||
break; | ||
} | ||
} | ||
// If no candidates available return an error | ||
if(finalCandidates.length == 0) return new Error("No replica set members available for query"); | ||
// Pick a random server | ||
return finalCandidates[Math.round(Math.random(1000000) * (finalCandidates.length - 1))].checkoutReader(); | ||
} | ||
@@ -63,3 +111,3 @@ | ||
var self = this; | ||
// Ping server function | ||
@@ -83,10 +131,10 @@ var pingFunction = function() { | ||
// Add error listener | ||
db.on("error", function(err) { | ||
db.on("error", function(err) { | ||
// Adjust the number of checks | ||
numberOfEntries = numberOfEntries - 1; | ||
// Close connection | ||
db.close(); | ||
db.close(); | ||
// If we are done with all results coming back trigger ping again | ||
if(numberOfEntries == 0 && self.state == 'connected') { | ||
setTimeout(pingFunction, 1000); | ||
setTimeout(pingFunction, 1000); | ||
} | ||
@@ -107,15 +155,15 @@ }) | ||
// Get end time of the command | ||
var endTime = new Date().getTime(); | ||
var endTime = new Date().getTime(); | ||
// Store the ping time in the server instance state variable, if there is one | ||
if(serverInstance != null && serverInstance.runtimeStats != null && serverInstance.isConnected()) { | ||
serverInstance.runtimeStats['pingMs'] = (endTime - startTime); | ||
serverInstance.runtimeStats['pingMs'] = (endTime - startTime); | ||
} | ||
// Close server | ||
p_db.close(); | ||
p_db.close(); | ||
// If we are done with all results coming back trigger ping again | ||
if(numberOfEntries == 0 && self.state == 'connected') { | ||
setTimeout(pingFunction, 1000); | ||
setTimeout(pingFunction, 1000); | ||
} | ||
}) | ||
}) | ||
} | ||
@@ -126,7 +174,7 @@ }) | ||
} | ||
// Start pingFunction | ||
setTimeout(pingFunction, 1000); | ||
// Do the callback | ||
// Do the callback | ||
callback(null); | ||
} |
@@ -1,2 +0,2 @@ | ||
// The Statistics strategy uses the measure of each end-start time for each | ||
// The Statistics strategy uses the measure of each end-start time for each | ||
// query executed against the db to calculate the mean, variance and standard deviation | ||
@@ -9,7 +9,7 @@ // and pick the server which the lowest mean and deviation | ||
// Starts any needed code | ||
StatisticsStrategy.prototype.start = function(callback) { | ||
StatisticsStrategy.prototype.start = function(callback) { | ||
callback(null, null); | ||
} | ||
StatisticsStrategy.prototype.stop = function(callback) { | ||
StatisticsStrategy.prototype.stop = function(callback) { | ||
// Remove reference to replicaset | ||
@@ -21,22 +21,63 @@ this.replicaset = null; | ||
StatisticsStrategy.prototype.checkoutSecondary = function() { | ||
// Get all secondary server keys | ||
var keys = Object.keys(this.replicaset._state.secondaries); | ||
// Contains the picked instance | ||
var minimumSscore = null; | ||
var selectedInstance = null; | ||
StatisticsStrategy.prototype.checkoutSecondary = function(tags, secondaryCandidates) { | ||
// Servers are picked based on the lowest ping time and then servers that lower than that + secondaryAcceptableLatencyMS | ||
// Create a list of candidat servers, containing the primary if available | ||
var candidateServers = []; | ||
// Pick server key by the lowest ping time | ||
for(var i = 0; i < keys.length; i++) { | ||
// Fetch a server | ||
var server = this.replicaset._state.secondaries[keys[i]]; | ||
// Pick server by lowest Sscore | ||
if(minimumSscore == null || (server.queryStats.sScore < minimumSscore)) { | ||
minimumSscore = server.queryStats.sScore; | ||
selectedInstance = server; | ||
// If we have not provided a list of candidate servers use the default setup | ||
if(!Array.isArray(secondaryCandidates)) { | ||
candidateServers = this.replicaset._state.master != null ? [this.replicaset._state.master] : []; | ||
// Add all the secondaries | ||
var keys = Object.keys(this.replicaset._state.secondaries); | ||
for(var i = 0; i < keys.length; i++) { | ||
candidateServers.push(this.replicaset._state.secondaries[keys[i]]) | ||
} | ||
} else { | ||
candidateServers = secondaryCandidates; | ||
} | ||
// Return the selected instance | ||
return selectedInstance != null ? selectedInstance.checkoutReader() : null; | ||
// Final list of eligable server | ||
var finalCandidates = []; | ||
// If we have tags filter by tags | ||
if(tags != null && typeof tags == 'object') { | ||
// If we have an array or single tag selection | ||
var tagObjects = Array.isArray(tags) ? tags : [tags]; | ||
// Iterate over all tags until we find a candidate server | ||
for(var _i = 0; _i < tagObjects.length; _i++) { | ||
// Grab a tag object | ||
var tagObject = tagObjects[_i]; | ||
// Matching keys | ||
var matchingKeys = Object.keys(tagObject); | ||
// Remove any that are not tagged correctly | ||
for(var i = 0; i < candidateServers.length; i++) { | ||
var server = candidateServers[i]; | ||
// If we have tags match | ||
if(server.tags != null) { | ||
var matching = true; | ||
// Ensure we have all the values | ||
for(var j = 0; j < matchingKeys.length; j++) { | ||
if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) { | ||
matching = false; | ||
break; | ||
} | ||
} | ||
// If we have a match add it to the list of matching servers | ||
if(matching) { | ||
finalCandidates.push(server); | ||
} | ||
} | ||
} | ||
} | ||
} else { | ||
// Final array candidates | ||
var finalCandidates = candidateServers; | ||
} | ||
// If no candidates available return an error | ||
if(finalCandidates.length == 0) return new Error("No replica set members available for query"); | ||
// Pick a random server | ||
return finalCandidates[Math.round(Math.random(1000000) * (finalCandidates.length - 1))].checkoutReader(); | ||
} |
@@ -10,3 +10,3 @@ var QueryCommand = require('./commands/query_command').QueryCommand, | ||
* Constructor for a cursor object that handles all the operations on query result | ||
* using find. This cursor object is unidirectional and cannot traverse backwards. Clients should not be creating a cursor directly, | ||
* using find. This cursor object is unidirectional and cannot traverse backwards. Clients should not be creating a cursor directly, | ||
* but use find to acquire a cursor. | ||
@@ -27,2 +27,3 @@ * | ||
* @param {Boolean} tailable allow the cursor to be tailable. | ||
* @param {Boolean} awaitdata allow the cursor to wait for data, only applicable for tailable cursor. | ||
* @param {Number} batchSize the number of the subset of results to request the database to return for every request. This should initially be greater than 1 otherwise the database will automatically close the cursor. The batch size can be set to 1 with cursorInstance.batchSize after performing the initial query to the database. | ||
@@ -37,6 +38,9 @@ * @param {Boolean} raw return all query documents as raw buffers (default false). | ||
* @param {String} comment you can put a $comment field on a query to make looking in the profiler logs simpler. | ||
* @param {Boolean} awaitdata allow the cursor to wait for data, only applicable for tailable cursor. | ||
* @param {Number} numberOfRetries if using awaidata specifies the number of times to retry on timeout. | ||
* @param {String} dbName override the default dbName. | ||
*/ | ||
function Cursor(db, collection, selector, fields, skip, limit | ||
, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk, raw, read | ||
, returnKey, maxScan, min, max, showDiskLoc, comment) { | ||
, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk, raw, read | ||
, returnKey, maxScan, min, max, showDiskLoc, comment, awaitdata, numberOfRetries, dbName) { | ||
this.db = db; | ||
@@ -54,2 +58,4 @@ this.collection = collection; | ||
this.tailable = tailable; | ||
this.awaitdata = awaitdata; | ||
this.numberOfRetries = numberOfRetries == null ? 1 : numberOfRetries; | ||
this.batchSizeValue = batchSize == null ? 0 : batchSize; | ||
@@ -65,3 +71,3 @@ this.slaveOk = slaveOk == null ? collection.slaveOk : slaveOk; | ||
this.comment = comment; | ||
this.totalNumberOfRecords = 0; | ||
@@ -71,2 +77,5 @@ this.items = []; | ||
// This name | ||
this.dbName = dbName; | ||
// State variables for the cursor | ||
@@ -77,3 +86,9 @@ this.state = Cursor.INIT; | ||
this.getMoreTimer = false; | ||
this.collectionName = (this.db.databaseName ? this.db.databaseName + "." : '') + this.collection.collectionName; | ||
// If we are using a specific db execute against it | ||
if(this.dbName != null) { | ||
this.collectionName = this.dbName + "." + this.collection.collectionName; | ||
} else { | ||
this.collectionName = (this.db.databaseName ? this.db.databaseName + "." : '') + this.collection.collectionName; | ||
} | ||
}; | ||
@@ -85,3 +100,3 @@ | ||
* | ||
* @return {Cursor} returns itself with rewind applied. | ||
* @return {Cursor} returns itself with rewind applied. | ||
* @api public | ||
@@ -104,3 +119,3 @@ */ | ||
} | ||
return self; | ||
@@ -113,7 +128,7 @@ }; | ||
* is enough memory to store the results. Note that the array only contain partial | ||
* results when this cursor had been previouly accessed. In that case, | ||
* results when this cursor had been previouly accessed. In that case, | ||
* cursor.rewind() can be used to reset the cursor. | ||
* | ||
* @param {Function} callback This will be called after executing this method successfully. The first paramter will contain the Error object if an error occured, or null otherwise. The second paramter will contain an array of BSON deserialized objects as a result of the query. | ||
* @return {null} | ||
* @return {null} | ||
* @api public | ||
@@ -132,4 +147,4 @@ */ | ||
var items = []; | ||
this.each(function(err, item) { | ||
this.each(function(err, item) { | ||
if(err != null) return callback(err, null); | ||
@@ -161,3 +176,3 @@ | ||
* @param {Function} callback this will be called for while iterating every document of the query result. The first paramter will contain the Error object if an error occured, or null otherwise. While the second paramter will contain the document. | ||
* @return {null} | ||
* @return {null} | ||
* @api public | ||
@@ -177,3 +192,3 @@ */ | ||
// Fetch the next object until there is no more objects | ||
self.nextObject(function(err, item) { | ||
self.nextObject(function(err, item) { | ||
if(err != null) return callback(err, null); | ||
@@ -199,3 +214,3 @@ if(item != null) { | ||
* @param {Function} callback this will be after executing this method. The first paramter will contain the Error object if an error occured, or null otherwise. While the second paramter will contain the number of results or null if an error occured. | ||
* @return {null} | ||
* @return {null} | ||
* @api public | ||
@@ -255,6 +270,6 @@ */ | ||
throw new Error("Tailable cursor doesn't support limit"); | ||
} | ||
} | ||
} else if(this.queryRun == true || this.state == Cursor.CLOSED) { | ||
if(callback) { | ||
callback(new Error("Cursor is closed"), null); | ||
callback(new Error("Cursor is closed"), null); | ||
} else { | ||
@@ -266,4 +281,4 @@ throw new Error("Cursor is closed"); | ||
if(callback) { | ||
callback(new Error("limit requires an integer"), null); | ||
} else { | ||
callback(new Error("limit requires an integer"), null); | ||
} else { | ||
throw new Error("limit requires an integer"); | ||
@@ -281,2 +296,26 @@ } | ||
/** | ||
* Sets the read preference for the cursor | ||
* | ||
* @param {String} the read preference for the cursor, one of Server.READ_PRIMARY, Server.READ_SECONDARY, Server.READ_SECONDARY_ONLY | ||
* @param {Function} [callback] this optional callback will be called after executing this method. The first parameter will contain an error object when the read preference given is not a valid number or when the cursor is already closed while the second parameter will contain a reference to this object upon successful execution. | ||
* @return {Cursor} an instance of this object. | ||
* @api public | ||
*/ | ||
Cursor.prototype.setReadPreference = function(readPreference, tags, callback) { | ||
if(typeof tags == 'function') callback = tags; | ||
callback = callback || function(){}; | ||
if(this.queryRun == true || this.state == Cursor.CLOSED) { | ||
callback(new Error("Cannot change read preference on executed query or closed cursor")); | ||
} else if(readPreference == null && readPreference != 'primary' | ||
&& readPreference != 'secondaryOnly' && readPreference != 'secondary') { | ||
callback(new Error("only readPreference of primary, secondary or secondaryOnly supported")); | ||
} else { | ||
this.read = readPreference; | ||
} | ||
return this; | ||
} | ||
/** | ||
* Sets the skip parameter of this cursor to the given value. | ||
@@ -338,3 +377,3 @@ * | ||
/** | ||
* The limit used for the getMore command | ||
* The limit used for the getMore command | ||
* | ||
@@ -349,3 +388,3 @@ * @return {Number} The number of records to request per batch. | ||
var absBatchValue = Math.abs(self.batchSizeValue); | ||
if(absLimitValue > 0) { | ||
@@ -380,2 +419,7 @@ if (absBatchValue > 0) { | ||
self.skipValue = self.limitValue = 0; | ||
// if awaitdata is set | ||
if(self.awaitdata != null) { | ||
queryOptions |= QueryCommand.OPTS_AWAIT_DATA; | ||
} | ||
} | ||
@@ -396,6 +440,5 @@ | ||
// Build special selector | ||
var specialSelector = {'query':self.selector}; | ||
var specialSelector = {'$query':self.selector}; | ||
if(self.sortValue != null) specialSelector['orderby'] = utils.formattedOrderClause(self.sortValue); | ||
if(self.hint != null && self.hint.constructor == Object) specialSelector['$hint'] = self.hint; | ||
if(self.explainValue != null) specialSelector['$explain'] = true; | ||
if(self.snapshot != null) specialSelector['$snapshot'] = true; | ||
@@ -408,2 +451,7 @@ if(self.returnKey != null) specialSelector['$returnKey'] = self.returnKey; | ||
if(self.comment != null) specialSelector['$comment'] = self.comment; | ||
// If we have explain set only return a single document with automatic cursor close | ||
if(self.explainValue != null) { | ||
numberToReturn = (-1)*Math.abs(numberToReturn); | ||
specialSelector['$explain'] = true; | ||
} | ||
@@ -430,3 +478,3 @@ // Return the query | ||
* | ||
* @param sortDirection {String|number} Range of acceptable values: | ||
* @param sortDirection {String|number} Range of acceptable values: | ||
* 'ascending', 'descending', 'asc', 'desc', 1, -1 | ||
@@ -477,3 +525,3 @@ * | ||
} | ||
result = null; | ||
@@ -483,3 +531,11 @@ self.nextObject(callback); | ||
self.db._executeQueryCommand(cmd, {read:self.read, raw:self.raw}, commandHandler); | ||
// If we have no connection set on this cursor check one out | ||
if(self.connection == null) { | ||
// Fetch either a reader or writer dependent on the specified read option | ||
self.connection = this.read == null ? self.db.serverConfig.checkoutWriter() : self.db.serverConfig.checkoutReader(this.read); | ||
} | ||
// Execute the command | ||
self.db._executeQueryCommand(cmd, {raw:self.raw, read:this.read, connection:self.connection}, commandHandler); | ||
// Set the command handler to null | ||
commandHandler = null; | ||
@@ -505,3 +561,3 @@ } else if(self.items.length) { | ||
var limit = 0; | ||
if (!self.tailable && self.limitValue > 0) { | ||
@@ -522,3 +578,4 @@ limit = self.limitValue - self.totalNumberOfRecords; | ||
var options = { read: self.read, raw: self.raw }; | ||
// Set up options | ||
var options = { read: self.read, raw: self.raw, connection:self.connection }; | ||
@@ -528,3 +585,5 @@ // Execute the command | ||
try { | ||
if(err != null) callback(err, null); | ||
if(err != null) { | ||
return callback(err, null); | ||
} | ||
@@ -549,4 +608,14 @@ var isDead = 1 === result.responseFlag && result.cursorId.isZero(); | ||
callback(null, self.items.shift()); | ||
} else if(self.tailable && !isDead && self.awaitdata) { | ||
// Excute the tailable cursor once more, will timeout after ~4 sec if awaitdata used | ||
self.numberOfRetries = self.numberOfRetries - 1; | ||
if(self.numberOfRetries == 0) { | ||
self.close(function() { | ||
callback(new Error("tailable cursor timed out"), null); | ||
}); | ||
} else { | ||
process.nextTick(function() {getMore(self, callback);}); | ||
} | ||
} else if(self.tailable && !isDead) { | ||
self.getMoreTimer = setTimeout(function() {getMore(self, callback);}, 500); | ||
self.getMoreTimer = setTimeout(function() {getMore(self, callback);}, 100); | ||
} else { | ||
@@ -564,6 +633,6 @@ self.close(function() {callback(null, null);}); | ||
} catch(err) { | ||
var handleClose = function() { | ||
var handleClose = function() { | ||
callback(err, null); | ||
}; | ||
self.close(handleClose); | ||
@@ -584,5 +653,7 @@ handleClose = null; | ||
// Create a new cursor and fetch the plan | ||
var cursor = new Cursor(this.db, this.collection, this.selector, this.fields, this.skipValue, limit, | ||
this.sortValue, this.hint, true, this.snapshot, this.timeout, this.tailable, this.batchSizeValue); | ||
// Fetch the explaination document | ||
var cursor = new Cursor(this.db, this.collection, this.selector, this.fields, this.skipValue, limit | ||
, this.sortValue, this.hint, true, this.snapshot, this.timeout, this.tailable, this.batchSizeValue | ||
, this.slaveOk, this.raw, this.read, this.returnKey, this.maxScan, this.min, this.max, this.showDiskLoc | ||
, this.comment, this.awaitdata, this.numberOfRetries, this.dbName); | ||
// Fetch the explaination document | ||
cursor.nextObject(function(err, item) { | ||
@@ -631,3 +702,3 @@ if(err != null) return callback(err, null); | ||
function execute(command) { | ||
self.db._executeQueryCommand(command, {read:self.read, raw:self.raw}, function(err,result) { | ||
self.db._executeQueryCommand(command, {read:self.read, raw:self.raw, connection:self.connection}, function(err,result) { | ||
if(err) { | ||
@@ -645,3 +716,3 @@ stream.emit('error', err); | ||
} | ||
var resflagsMap = { | ||
@@ -653,3 +724,3 @@ CursorNotFound:1<<0, | ||
}; | ||
if(result.documents && result.documents.length && !(result.responseFlag & resflagsMap.QueryFailure)) { | ||
@@ -683,3 +754,3 @@ try { | ||
} | ||
return stream; | ||
@@ -712,6 +783,8 @@ }; | ||
var command = new KillCursorCommand(this.db, [this.cursorId]); | ||
this.db._executeQueryCommand(command, {read:self.read, raw:self.raw}, null); | ||
this.db._executeQueryCommand(command, {read:self.read, raw:self.raw, connection:self.connection}, null); | ||
} catch(err) {} | ||
} | ||
// Null out the connection | ||
self.connection = null; | ||
// Reset cursor id | ||
@@ -742,3 +815,3 @@ this.cursorId = Long.fromInt(0); | ||
* Init state | ||
* | ||
* | ||
* @classconstant INIT | ||
@@ -750,3 +823,3 @@ **/ | ||
* Cursor open | ||
* | ||
* | ||
* @classconstant OPEN | ||
@@ -758,3 +831,3 @@ **/ | ||
* Cursor closed | ||
* | ||
* | ||
* @classconstant CLOSED | ||
@@ -761,0 +834,0 @@ **/ |
@@ -14,3 +14,3 @@ /** | ||
* - **error** {function(err) {}} the error event triggers if an error happens. | ||
* - **end** {function() {}} the end event triggers when there is no more documents available. | ||
* - **close** {function() {}} the end event triggers when there is no more documents available. | ||
* | ||
@@ -17,0 +17,0 @@ * @class Represents a CursorStream. |
@@ -21,3 +21,3 @@ var Binary = require('bson').Binary, | ||
if(!(this instanceof Chunk)) return new Chunk(file, mongoObject); | ||
this.file = file; | ||
@@ -40,3 +40,3 @@ var self = this; | ||
this.data = new Binary(buffer); | ||
} else if(mongoObjectFinal.data instanceof Binary || Object.prototype.toString.call(mongoObjectFinal.data) == "[object Binary]") { | ||
} else if(mongoObjectFinal.data instanceof Binary || Object.prototype.toString.call(mongoObjectFinal.data) == "[object Binary]") { | ||
this.data = mongoObjectFinal.data; | ||
@@ -74,6 +74,6 @@ } else if(Buffer.isBuffer(mongoObjectFinal.data)) { | ||
*/ | ||
Chunk.prototype.read = function(length) { | ||
Chunk.prototype.read = function(length) { | ||
// Default to full read if no index defined | ||
length = length == null || length == 0 ? this.length() : length; | ||
if(this.length() - this.internalPosition + 1 >= length) { | ||
@@ -89,3 +89,3 @@ var data = this.data.read(this.internalPosition, length); | ||
Chunk.prototype.readSlice = function(length) { | ||
if ((this.length() - this.internalPosition + 1) >= length) { | ||
if ((this.length() - this.internalPosition) >= length) { | ||
var data = null; | ||
@@ -147,7 +147,11 @@ if (this.data.buffer != null) { //Pure BSON | ||
self.file.chunkCollection(function(err, collection) { | ||
if(err) return callback(err); | ||
collection.remove({'_id':self.objectId}, {safe:true}, function(err, result) { | ||
if(err) return callback(err); | ||
if(self.data.length() > 0) { | ||
self.buildMongoObject(function(mongoObject) { | ||
self.buildMongoObject(function(mongoObject) { | ||
collection.insert(mongoObject, {safe:true}, function(err, collection) { | ||
callback(null, self); | ||
callback(err, self); | ||
}); | ||
@@ -208,3 +212,3 @@ }); | ||
} | ||
}); | ||
}); | ||
@@ -211,0 +215,0 @@ /** |
@@ -15,3 +15,5 @@ /** | ||
util = require('util'), | ||
ReadStream = require('./readstream').ReadStream; | ||
inherits = util.inherits, | ||
ReadStream = require('./readstream').ReadStream, | ||
Stream = require('stream'); | ||
@@ -31,3 +33,3 @@ var REFERENCE_BY_FILENAME = 0, | ||
* - **root** {String}, root collection to use. Defaults to **{GridStore.DEFAULT_ROOT_COLLECTION}**. | ||
* - **chunk_type** {String}, mime type of the file. Defaults to **{GridStore.DEFAULT_CONTENT_TYPE}**. | ||
* - **content_type** {String}, mime type of the file. Defaults to **{GridStore.DEFAULT_CONTENT_TYPE}**. | ||
* - **chunk_size** {Number}, size for the chunk. Defaults to **{Chunk.DEFAULT_CHUNK_SIZE}**. | ||
@@ -44,26 +46,41 @@ * - **metadata** {Object}, arbitrary data the user wants to store. | ||
*/ | ||
function GridStore(db, id, filename, mode, options) { | ||
var GridStore = function GridStore(db, id, filename, mode, options) { | ||
if(!(this instanceof GridStore)) return new GridStore(db, id, filename, mode, options); | ||
var self = this; | ||
this.db = db; | ||
var _filename = filename; | ||
this.db = db; | ||
if(typeof filename == 'string' && typeof mode == 'string') { | ||
_filename = filename; | ||
} else if(typeof filename == 'string' && typeof mode == 'object' && mode != null) { | ||
var _mode = mode; | ||
// Call stream constructor | ||
Stream.call(this); | ||
// Handle options | ||
if(options == null) options = {}; | ||
// Handle mode | ||
if(mode == null) { | ||
mode = filename; | ||
options = _mode; | ||
_filename = id; | ||
} else if(typeof filename == 'string' && mode == null) { | ||
filename = null; | ||
} else if(typeof mode == 'object') { | ||
options = mode; | ||
mode = filename; | ||
_filename = id; | ||
filename = null; | ||
} | ||
// set grid referencetype | ||
this.referenceBy = typeof id == 'string' ? 0 : 1; | ||
this.filename = _filename; | ||
this.fileId = typeof id == 'string' ? new ObjectID() : id; | ||
// Handle id | ||
if(id instanceof ObjectID && (typeof filename == 'string' || filename == null)) { | ||
this.referenceBy = 1; | ||
this.fileId = id; | ||
this.filename = filename; | ||
} else if(!(id instanceof ObjectID) && typeof id == 'string' && mode.indexOf("w") != null) { | ||
this.referenceBy = 0; | ||
this.fileId = new ObjectID(); | ||
this.filename = id; | ||
} else if(!(id instanceof ObjectID) && typeof id == 'string' && mode.indexOf("r") != null) { | ||
this.referenceBy = 0; | ||
this.filename = filename; | ||
} else { | ||
this.referenceBy = 1; | ||
this.fileId = id; | ||
this.filename = filename; | ||
} | ||
// Set up the rest | ||
@@ -75,3 +92,3 @@ this.mode = mode == null ? "r" : mode; | ||
// Set default chunk size | ||
this.internalChunkSize = this.options['chunkSize'] == null ? Chunk.DEFAULT_CHUNK_SIZE : this.options['chunkSize']; | ||
this.internalChunkSize = this.options['chunkSize'] == null ? Chunk.DEFAULT_CHUNK_SIZE : this.options['chunkSize']; | ||
// Previous chunk size | ||
@@ -82,2 +99,13 @@ this.previousChunkSize = 0; | ||
/** | ||
* Code for the streaming capabilities of the gridstore object | ||
* Most code from Aaron heckmanns project https://github.com/aheckmann/gridfs-stream | ||
* Modified to work on the gridstore object itself | ||
* @ignore | ||
*/ | ||
GridStore.prototype = { __proto__: Stream.prototype } | ||
// Move pipe to _pipe | ||
GridStore.prototype._pipe = GridStore.prototype.pipe; | ||
/** | ||
* Opens the file from the database and initialize this object. Also creates a | ||
@@ -97,11 +125,21 @@ * new one if file does not exist. | ||
var self = this; | ||
if((self.mode == "w" || self.mode == "w+") && self.db.serverConfig.primary != null) { | ||
// Get files collection | ||
self.collection(function(err, collection) { | ||
// Get chunk collection | ||
self.chunkCollection(function(err, chunkCollection) { | ||
// Ensure index on chunk collection | ||
chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) { | ||
_open(self, callback); | ||
if(err) return callback(err); | ||
// Put index on filename | ||
collection.ensureIndex([['filename', 1]], function(err, index) { | ||
if(err) return callback(err); | ||
// Get chunk collection | ||
self.chunkCollection(function(err, chunkCollection) { | ||
if(err) return callback(err); | ||
// Ensure index on chunk collection | ||
chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) { | ||
if(err) return callback(err); | ||
_open(self, callback); | ||
}); | ||
}); | ||
@@ -111,2 +149,3 @@ }); | ||
} else { | ||
// Open the gridstore | ||
_open(self, callback); | ||
@@ -127,3 +166,3 @@ } | ||
} | ||
// Create the query | ||
@@ -136,7 +175,12 @@ var query = self.referenceBy == REFERENCE_BY_ID ? {_id:self.fileId} : {filename:self.filename}; | ||
collection.find(query, function(err, cursor) { | ||
if(err) return error(err); | ||
// Fetch the file | ||
cursor.nextObject(function(err, doc) { | ||
if(err) return error(err); | ||
// Check if the collection for the files exists otherwise prepare the new one | ||
if(doc != null) { | ||
self.fileId = doc._id; | ||
self.filename = doc.filename; | ||
self.contentType = doc.contentType; | ||
@@ -150,4 +194,2 @@ self.internalChunkSize = doc.chunkSize; | ||
} else { | ||
// self.fileId = | ||
// self.fileId = self.fileId instanceof ObjectID ? self.fileId : new ObjectID(); | ||
self.fileId = self.fileId == null ? new ObjectID() : self.fileId; | ||
@@ -162,2 +204,3 @@ self.contentType = exports.GridStore.DEFAULT_CONTENT_TYPE; | ||
nthChunk(self, 0, function(err, chunk) { | ||
if(err) return error(err); | ||
self.currentChunk = chunk; | ||
@@ -170,2 +213,3 @@ self.position = 0; | ||
deleteChunks(self, function(err, result) { | ||
if(err) return error(err); | ||
self.currentChunk = new Chunk(self, {'n':0}); | ||
@@ -180,2 +224,3 @@ self.contentType = self.options['content_type'] == null ? self.contentType : self.options['content_type']; | ||
nthChunk(self, lastChunkNumber(self), function(err, chunk) { | ||
if(err) return error(err); | ||
// Set the current chunk | ||
@@ -197,4 +242,6 @@ self.currentChunk = chunk == null ? new Chunk(self, {'n':0}) : chunk; | ||
self.length = 0; | ||
self.chunkCollection(function(err, collection2) { | ||
if(err) return error(err); | ||
// No file exists set up write mode | ||
@@ -204,2 +251,3 @@ if(self.mode == "w") { | ||
deleteChunks(self, function(err, result) { | ||
if(err) return error(err); | ||
self.currentChunk = new Chunk(self, {'n':0}); | ||
@@ -214,2 +262,3 @@ self.contentType = self.options['content_type'] == null ? self.contentType : self.options['content_type']; | ||
nthChunk(self, lastChunkNumber(self), function(err, chunk) { | ||
if(err) return error(err); | ||
// Set the current chunk | ||
@@ -226,2 +275,8 @@ self.currentChunk = chunk == null ? new Chunk(self, {'n':0}) : chunk; | ||
}); | ||
// only pass error to callback once | ||
function error (err) { | ||
if(error.err) return; | ||
callback(error.err = err); | ||
} | ||
}; | ||
@@ -241,3 +296,3 @@ | ||
fs.open(file, 'r', 0666, function (err, fd) { | ||
// TODO Handle err | ||
if(err) return callback(err); | ||
self.writeFile(fd, callback); | ||
@@ -249,24 +304,34 @@ }); | ||
self.open(function (err, self) { | ||
if(err) return callback(err); | ||
fs.fstat(file, function (err, stats) { | ||
if(err) return callback(err); | ||
var offset = 0; | ||
var index = 0; | ||
var numberOfChunksLeft = Math.min(stats.size / self.chunkSize); | ||
// Write a chunk | ||
var writeChunk = function() { | ||
fs.read(file, self.chunkSize, offset, 'binary', function(err, data, bytesRead) { | ||
if(err) return callback(err); | ||
offset = offset + bytesRead; | ||
// Create a new chunk for the data | ||
var chunk = new Chunk(self, {n:index++}); | ||
chunk.write(data, function(err, chunk) { | ||
if(err) return callback(err); | ||
chunk.save(function(err, result) { | ||
if(err) return callback(err); | ||
self.position = self.position + data.length; | ||
// Point to current chunk | ||
self.currentChunk = chunk; | ||
if(offset >= stats.size) { | ||
fs.close(file); | ||
self.close(function(err, result) { | ||
return callback(null, result); | ||
}) | ||
self.close(callback); | ||
} else { | ||
@@ -279,3 +344,3 @@ return process.nextTick(writeChunk); | ||
} | ||
// Process the first write | ||
@@ -288,21 +353,2 @@ process.nextTick(writeChunk); | ||
/** | ||
* Writes some data. This method will work properly only if initialized with mode "w" or "w+". | ||
* | ||
* @param {String|Buffer} data the data to write. | ||
* @param {Boolean} [close] closes this file after writing if set to true. | ||
* @param {Function} callback this will be called after executing this method. The first parameter will contain null and the second one will contain a reference to this object. | ||
* @return {null} | ||
* @api public | ||
*/ | ||
GridStore.prototype.write = function(data, close, callback) { | ||
// If we have a buffer write it using the writeBuffer method | ||
if(Buffer.isBuffer(data)) { | ||
return writeBuffer(this, data, close, callback); | ||
} else { | ||
// Wrap the string in a buffer and write | ||
return writeBuffer(this, new Buffer(data, 'binary'), close, callback); | ||
} | ||
}; | ||
/** | ||
* Writes some data. This method will work properly only if initialized with mode | ||
@@ -322,8 +368,8 @@ * "w" or "w+". | ||
var writeBuffer = function(self, buffer, close, callback) { | ||
if(typeof close === "function") { callback = close; close = null; } | ||
var finalClose = (close == null) ? false : close; | ||
if(self.mode[0] != "w") { | ||
callback(new Error((self.referenceBy == REFERENCE_BY_ID ? self.toHexString() : self.filename) + " not opened for writing"), null); | ||
} else { | ||
if(typeof close === "function") { callback = close; close = null; } | ||
var finalClose = (close == null) ? false : close; | ||
if(self.mode[0] != "w") { | ||
callback(new Error((self.referenceBy == REFERENCE_BY_ID ? self.toHexString() : self.filename) + " not opened for writing"), null); | ||
} else { | ||
if(self.currentChunk.position + buffer.length >= self.chunkSize) { | ||
@@ -334,3 +380,3 @@ // Write out the current Chunk and then keep writing until we have less data left than a chunkSize left | ||
var leftOverDataSize = self.chunkSize - self.currentChunk.position; | ||
var firstChunkData = buffer.slice(0, leftOverDataSize); | ||
var firstChunkData = buffer.slice(0, leftOverDataSize); | ||
var leftOverData = buffer.slice(leftOverDataSize); | ||
@@ -352,3 +398,3 @@ // A list of chunks to write out | ||
} | ||
// Set current chunk with remaining data | ||
@@ -358,3 +404,3 @@ self.currentChunk = new Chunk(self, {'n': (previousChunkNumber + 1)}); | ||
if(leftOverData.length > 0) self.currentChunk.write(leftOverData); | ||
// Update the position for the gridstore | ||
@@ -368,4 +414,6 @@ self.position = self.position + buffer.length; | ||
chunk.save(function(err, result) { | ||
if(err) return callback(err); | ||
numberOfChunksToWrite = numberOfChunksToWrite - 1; | ||
if(numberOfChunksToWrite <= 0) { | ||
@@ -375,3 +423,3 @@ return callback(null, self); | ||
}) | ||
} | ||
} | ||
} else { | ||
@@ -382,3 +430,3 @@ // Update the position for the gridstore | ||
self.currentChunk.write(buffer); | ||
callback(null, self); | ||
callback(null, self); | ||
} | ||
@@ -394,3 +442,3 @@ } | ||
* the structure: | ||
* | ||
* | ||
* <pre><code> | ||
@@ -423,9 +471,5 @@ * { | ||
} | ||
// console.log("============================== self.currentChunk.chunkNumber :: " + self.currentChunk.chunkNumber) | ||
// console.log("============================== self.currentChunk.position :: " + self.currentChunk.position) | ||
// console.log(self.position) | ||
// Calcuate the length | ||
var length = self.currentChunk != null ? (chunkNumber * self.chunkSize + previousChunkSize) : 0; | ||
var length = self.currentChunk != null ? (chunkNumber * self.chunkSize + previousChunkSize) : 0; | ||
var mongoObject = { | ||
@@ -464,8 +508,14 @@ '_id': self.fileId, | ||
self.currentChunk.save(function(err, chunk) { | ||
if(err) return callback(err); | ||
self.collection(function(err, files) { | ||
if(err) return callback(err); | ||
// Build the mongo object | ||
if(self.uploadDate != null) { | ||
files.remove({'_id':self.fileId}, {safe:true}, function(err, collection) { | ||
if(err) return callback(err); | ||
buildMongoObject(self, function(mongoObject) { | ||
files.save(mongoObject, {safe:true}, function(err, doc) { | ||
files.save(mongoObject, {safe:true}, function(err) { | ||
callback(err, mongoObject); | ||
@@ -478,3 +528,3 @@ }); | ||
buildMongoObject(self, function(mongoObject) { | ||
files.save(mongoObject, {safe:true}, function(err, doc) { | ||
files.save(mongoObject, {safe:true}, function(err) { | ||
callback(err, mongoObject); | ||
@@ -488,5 +538,7 @@ }); | ||
self.collection(function(err, files) { | ||
if(err) return callback(err); | ||
self.uploadDate = new Date(); | ||
buildMongoObject(self, function(mongoObject) { | ||
files.save(mongoObject, {safe:true}, function(err, doc) { | ||
files.save(mongoObject, {safe:true}, function(err) { | ||
callback(err, mongoObject); | ||
@@ -518,4 +570,10 @@ }); | ||
self.chunkCollection(function(err, collection) { | ||
if(err) return callback(err); | ||
collection.find({'files_id':self.fileId, 'n':chunkNumber}, function(err, cursor) { | ||
if(err) return callback(err); | ||
cursor.nextObject(function(err, chunk) { | ||
if(err) return callback(err); | ||
var finalChunk = chunk == null ? {} : chunk; | ||
@@ -570,6 +628,5 @@ callback(null, new Chunk(self, finalChunk)); | ||
self.chunkCollection(function(err, collection) { | ||
if(err!==null) { | ||
callback(err, false); | ||
} | ||
if(err) return callback(err, false); | ||
collection.remove({'files_id':self.fileId}, {safe:true}, function(err, result) { | ||
if(err) return callback(err, false); | ||
callback(null, true); | ||
@@ -594,13 +651,13 @@ }); | ||
if(err!==null) { | ||
callback("at deleteChunks: "+err); | ||
return; | ||
err.message = "at deleteChunks: " + err.message; | ||
return callback(err); | ||
} | ||
self.collection(function(err, collection) { | ||
if(err!==null) { | ||
callback("at collection: "+err); | ||
return; | ||
err.message = "at collection: " + err.message; | ||
return callback(err); | ||
} | ||
collection.remove({'_id':self.fileId}, {safe:true}, function(err, collection) { | ||
collection.remove({'_id':self.fileId}, {safe:true}, function(err) { | ||
callback(err, self); | ||
@@ -636,3 +693,5 @@ }); | ||
this.read(function(err, data) { | ||
this.read(function(err, data) { | ||
if(err) return callback(err); | ||
var items = data.toString().split(separator); | ||
@@ -643,3 +702,3 @@ items = items.length > 0 ? items.splice(0, items.length - 1) : []; | ||
} | ||
callback(null, items); | ||
@@ -663,2 +722,3 @@ }); | ||
deleteChunks(self, function(err, gridStore) { | ||
if(err) return callback(err); | ||
self.currentChunk = new Chunk(self, {'n': 0}); | ||
@@ -670,2 +730,3 @@ self.position = 0; | ||
self.currentChunk(0, function(err, chunk) { | ||
if(err) return callback(err); | ||
self.currentChunk = chunk; | ||
@@ -713,3 +774,3 @@ self.currentChunk.rewind(); | ||
if((self.currentChunk.length() - self.currentChunk.position + 1 + finalBuffer._index) >= finalLength) { | ||
if((self.currentChunk.length() - self.currentChunk.position + finalBuffer._index) >= finalLength) { | ||
var slice = self.currentChunk.readSlice(finalLength - finalBuffer._index); | ||
@@ -725,3 +786,2 @@ // Copy content to final buffer | ||
} else { | ||
// console.dir(self.currentChunk) | ||
var slice = self.currentChunk.readSlice(self.currentChunk.length() - self.currentChunk.position); | ||
@@ -733,4 +793,6 @@ // Copy content to final buffer | ||
// Load next chunk and read more | ||
// Load next chunk and read more | ||
nthChunk(self, self.currentChunk.chunkNumber + 1, function(err, chunk) { | ||
if(err) return callback(err); | ||
if(chunk.length() > 0) { | ||
@@ -740,3 +802,7 @@ self.currentChunk = chunk; | ||
} else { | ||
finalBuffer._index > 0 ? callback(null, finalBuffer) : callback(new Error("no chunks found for file, possibly corrupt"), null); | ||
if (finalBuffer._index > 0) { | ||
callback(null, finalBuffer) | ||
} else { | ||
callback(new Error("no chunks found for file, possibly corrupt"), null); | ||
} | ||
} | ||
@@ -799,3 +865,3 @@ }); | ||
self.currentChunk.position = (self.position % self.chunkSize); | ||
callback(null, self); | ||
callback(err, self); | ||
}); | ||
@@ -805,3 +871,4 @@ }; | ||
if(self.mode[0] == 'w') { | ||
self.currentChunk.save(function(err, chunk) { | ||
self.currentChunk.save(function(err) { | ||
if(err) return callback(err); | ||
seekChunk(); | ||
@@ -845,3 +912,3 @@ }); | ||
self.position = self.position + 1; | ||
callback(null, self.currentChunk.getc()); | ||
callback(err, self.currentChunk.getc()); | ||
}); | ||
@@ -887,3 +954,3 @@ } else { | ||
* The collection to be used for holding the files and chunks collection. | ||
* | ||
* | ||
* @classconstant DEFAULT_ROOT_COLLECTION | ||
@@ -895,3 +962,3 @@ **/ | ||
* Default file mime type | ||
* | ||
* | ||
* @classconstant DEFAULT_CONTENT_TYPE | ||
@@ -903,3 +970,3 @@ **/ | ||
* Seek mode where the given length is absolute. | ||
* | ||
* | ||
* @classconstant IO_SEEK_SET | ||
@@ -911,3 +978,3 @@ **/ | ||
* Seek mode where the given length is an offset to the current read/write head. | ||
* | ||
* | ||
* @classconstant IO_SEEK_CUR | ||
@@ -919,3 +986,3 @@ **/ | ||
* Seek mode where the given length is an offset to the end of the file. | ||
* | ||
* | ||
* @classconstant IO_SEEK_END | ||
@@ -943,7 +1010,14 @@ **/ | ||
db.collection(rootCollectionFinal + ".files", function(err, collection) { | ||
if(err) return callback(err); | ||
// Build query | ||
var query = (typeof fileIdObject == 'string' || Object.prototype.toString.call(fileIdObject) == '[object RegExp]' ) | ||
? {'filename':fileIdObject} : {'_id':fileIdObject}; // Attempt to locate file | ||
? {'filename':fileIdObject} | ||
: {'_id':fileIdObject}; // Attempt to locate file | ||
collection.find(query, function(err, cursor) { | ||
if(err) return callback(err); | ||
cursor.nextObject(function(err, item) { | ||
if(err) return callback(err); | ||
callback(null, item == null ? false : true); | ||
@@ -975,3 +1049,3 @@ }); | ||
} | ||
// Check if we are returning by id not filename | ||
@@ -983,10 +1057,14 @@ var byId = options['id'] != null ? options['id'] : false; | ||
db.collection((rootCollectionFinal + ".files"), function(err, collection) { | ||
if(err) return callback(err); | ||
collection.find(function(err, cursor) { | ||
cursor.each(function(err, item) { | ||
if(item != null) { | ||
items.push(byId ? item._id : item.filename); | ||
} else { | ||
callback(null, items); | ||
} | ||
}); | ||
if(err) return callback(err); | ||
cursor.each(function(err, item) { | ||
if(item != null) { | ||
items.push(byId ? item._id : item.filename); | ||
} else { | ||
callback(err, items); | ||
} | ||
}); | ||
}); | ||
@@ -1023,2 +1101,3 @@ }); | ||
new GridStore(db, name, "r", options).open(function(err, gridStore) { | ||
if(err) return callback(err); | ||
// Make sure we are not reading out of bounds | ||
@@ -1028,13 +1107,10 @@ if(offset && offset >= gridStore.length) return callback("offset larger than size of file", null); | ||
if(offset && length && (offset + length) > gridStore.length) return callback("offset and length is larger than the size of the file", null); | ||
if(offset != null) { | ||
gridStore.seek(offset, function(err, gridStore) { | ||
gridStore.read(length, function(err, data) { | ||
callback(err, data); | ||
}); | ||
if(err) return callback(err); | ||
gridStore.read(length, callback); | ||
}); | ||
} else { | ||
gridStore.read(length, function(err, data) { | ||
callback(err, data); | ||
}); | ||
gridStore.read(length, callback); | ||
} | ||
@@ -1063,5 +1139,4 @@ }); | ||
new GridStore(db, name, "r", options).open(function(err, gridStore) { | ||
gridStore.readlines(finalSeperator, function(err, lines) { | ||
callback(err, lines); | ||
}); | ||
if(err) return callback(err); | ||
gridStore.readlines(finalSeperator, callback); | ||
}); | ||
@@ -1098,4 +1173,7 @@ }; | ||
new GridStore(db, names, "w", options).open(function(err, gridStore) { | ||
if(err) return callback(err); | ||
deleteChunks(gridStore, function(err, result) { | ||
if(err) return callback(err); | ||
gridStore.collection(function(err, collection) { | ||
if(err) return callback(err); | ||
collection.remove({'_id':gridStore.fileId}, {safe:true}, function(err, collection) { | ||
@@ -1112,3 +1190,3 @@ callback(err, self); | ||
* Returns the current chunksize of the file. | ||
* | ||
* | ||
* @field chunkSize | ||
@@ -1135,3 +1213,3 @@ * @type {Number} | ||
* The md5 checksum for this file. | ||
* | ||
* | ||
* @field md5 | ||
@@ -1150,5 +1228,289 @@ * @type {Number} | ||
/** | ||
* GridStore Streaming methods | ||
* Handles the correct return of the writeable stream status | ||
* @ignore | ||
*/ | ||
Object.defineProperty(GridStore.prototype, "writable", { enumerable: true | ||
, get: function () { | ||
if(this._writeable == null) { | ||
this._writeable = this.mode != null && this.mode.indexOf("w") != -1; | ||
} | ||
// Return the _writeable | ||
return this._writeable; | ||
} | ||
, set: function(value) { | ||
this._writeable = value; | ||
} | ||
}); | ||
/** | ||
* Handles the correct return of the readable stream status | ||
* @ignore | ||
*/ | ||
Object.defineProperty(GridStore.prototype, "readable", { enumerable: true | ||
, get: function () { | ||
if(this._readable == null) { | ||
this._readable = this.mode != null && this.mode.indexOf("r") != -1; | ||
} | ||
return this._readable; | ||
} | ||
, set: function(value) { | ||
this._readable = value; | ||
} | ||
}); | ||
GridStore.prototype.paused; | ||
/** | ||
* Handles the correct setting of encoding for the stream | ||
* @ignore | ||
*/ | ||
GridStore.prototype.setEncoding = fs.ReadStream.prototype.setEncoding; | ||
/** | ||
* Handles the end events | ||
* @ignore | ||
*/ | ||
GridStore.prototype.end = function end(data) { | ||
var self = this; | ||
// allow queued data to write before closing | ||
if(!this.writable) return; | ||
this.writable = false; | ||
if(data) { | ||
this._q.push(data); | ||
} | ||
this.on('drain', function () { | ||
self.close(function (err) { | ||
if (err) return _error(self, err); | ||
self.emit('close'); | ||
}); | ||
}); | ||
_flush(self); | ||
} | ||
/** | ||
* Handles the normal writes to gridstore | ||
* @ignore | ||
*/ | ||
var _writeNormal = function(self, data, close, callback) { | ||
// If we have a buffer write it using the writeBuffer method | ||
if(Buffer.isBuffer(data)) { | ||
return writeBuffer(self, data, close, callback); | ||
} else { | ||
// Wrap the string in a buffer and write | ||
return writeBuffer(self, new Buffer(data, 'binary'), close, callback); | ||
} | ||
} | ||
/** | ||
* Writes some data. This method will work properly only if initialized with mode "w" or "w+". | ||
* | ||
* @param {String|Buffer} data the data to write. | ||
* @param {Boolean} [close] closes this file after writing if set to true. | ||
* @param {Function} callback this will be called after executing this method. The first parameter will contain null and the second one will contain a reference to this object. | ||
* @return {null} | ||
* @api public | ||
*/ | ||
GridStore.prototype.write = function write(data, close, callback) { | ||
// If it's a normal write delegate the call | ||
if(typeof close == 'function' || typeof callback == 'function') { | ||
return _writeNormal(this, data, close, callback); | ||
} | ||
// Otherwise it's a stream write | ||
var self = this; | ||
if (!this.writable) { | ||
throw new Error('GridWriteStream is not writable'); | ||
} | ||
// queue data until we open. | ||
if (!this._opened) { | ||
// Set up a queue to save data until gridstore object is ready | ||
this._q = []; | ||
_openStream(self); | ||
this._q.push(data); | ||
return false; | ||
} | ||
// Push data to queue | ||
this._q.push(data); | ||
_flush(this); | ||
// Return write successful | ||
return true; | ||
} | ||
/** | ||
* Handles the destroy part of a stream | ||
* @ignore | ||
*/ | ||
GridStore.prototype.destroy = function destroy() { | ||
// close and do not emit any more events. queued data is not sent. | ||
if(!this.writable) return; | ||
this.readable = false; | ||
if(this.writable) { | ||
this.writable = false; | ||
this._q.length = 0; | ||
this.emit('close'); | ||
} | ||
} | ||
/** | ||
* Handles the destroySoon part of a stream | ||
* @ignore | ||
*/ | ||
GridStore.prototype.destroySoon = function destroySoon() { | ||
// as soon as write queue is drained, destroy. | ||
// may call destroy immediately if no data is queued. | ||
if(!this._q.length) { | ||
return this.destroy(); | ||
} | ||
this._destroying = true; | ||
} | ||
/** | ||
* Handles the pipe part of the stream | ||
* @ignore | ||
*/ | ||
GridStore.prototype.pipe = function(destination, options) { | ||
var self = this; | ||
// Open the gridstore | ||
this.open(function(err, result) { | ||
if(err) _errorRead(self, err); | ||
if(!self.readable) return; | ||
// Set up the pipe | ||
self._pipe(destination, options); | ||
// Emit the stream is open | ||
self.emit('open'); | ||
// Read from the stream | ||
_read(self); | ||
}) | ||
} | ||
/** | ||
* Internal module methods | ||
* @ignore | ||
*/ | ||
var _read = function _read(self) { | ||
if (!self.readable || self.paused || self.reading) { | ||
return; | ||
} | ||
self.reading = true; | ||
var stream = self._stream = self.stream(); | ||
stream.paused = self.paused; | ||
stream.on('data', function (data) { | ||
if (self._decoder) { | ||
var str = self._decoder.write(data); | ||
if (str.length) self.emit('data', str); | ||
} else { | ||
self.emit('data', data); | ||
} | ||
}); | ||
stream.on('end', function (data) { | ||
self.emit('end', data); | ||
}); | ||
stream.on('error', function (data) { | ||
_errorRead(self, data); | ||
}); | ||
stream.on('close', function (data) { | ||
self.emit('close', data); | ||
}); | ||
self.pause = function () { | ||
// native doesn't always pause. | ||
// bypass its pause() method to hack it | ||
self.paused = stream.paused = true; | ||
} | ||
self.resume = function () { | ||
self.paused = false; | ||
stream.resume(); | ||
self.readable = stream.readable; | ||
} | ||
self.destroy = function () { | ||
self.readable = false; | ||
stream.destroy(); | ||
} | ||
} | ||
/** | ||
* pause | ||
* @ignore | ||
*/ | ||
GridStore.prototype.pause = function pause () { | ||
// Overridden when the GridStore opens. | ||
this.paused = true; | ||
} | ||
/** | ||
* resume | ||
* @ignore | ||
*/ | ||
GridStore.prototype.resume = function resume () { | ||
// Overridden when the GridStore opens. | ||
this.paused = false; | ||
} | ||
/** | ||
* Internal module methods | ||
* @ignore | ||
*/ | ||
var _flush = function _flush(self, _force) { | ||
if (!self._opened) return; | ||
if (!_force && self._flushing) return; | ||
self._flushing = true; | ||
// write the entire q to gridfs | ||
if (!self._q.length) { | ||
self._flushing = false; | ||
self.emit('drain'); | ||
if(self._destroying) { | ||
self.destroy(); | ||
} | ||
return; | ||
} | ||
self.write(self._q.shift(), function (err, store) { | ||
if (err) return _error(self, err); | ||
self.emit('progress', store.position); | ||
_flush(self, true); | ||
}); | ||
} | ||
var _openStream = function _openStream (self) { | ||
if(self._opening == true) return; | ||
self._opening = true; | ||
// Open the store | ||
self.open(function (err, gridstore) { | ||
if (err) return _error(self, err); | ||
self._opened = true; | ||
self.emit('open'); | ||
_flush(self); | ||
}); | ||
} | ||
var _error = function _error(self, err) { | ||
self.destroy(); | ||
self.emit('error', err); | ||
} | ||
var _errorRead = function _errorRead (self, err) { | ||
self.readable = false; | ||
self.emit('error', err); | ||
} | ||
/** | ||
* @ignore | ||
* @api private | ||
*/ | ||
exports.GridStore = GridStore; |
@@ -29,2 +29,3 @@ var Stream = require('stream').Stream, | ||
this.completedLength = 0; | ||
this.currentChunkNumber = 0; | ||
@@ -36,2 +37,5 @@ this.paused = false; | ||
// Calculate the number of chunks | ||
this.numberOfChunks = Math.ceil(gstore.length/gstore.chunkSize); | ||
var self = this; | ||
@@ -77,13 +81,10 @@ process.nextTick(function() { | ||
if ((gstore.currentChunk.length() - gstore.currentChunk.position + 1 + self.completedLength) >= self.finalLength) { | ||
toRead = self.finalLength - self.completedLength; | ||
self.executing = false; | ||
last = true; | ||
} else { | ||
toRead = gstore.currentChunk.length(); | ||
if(gstore.currentChunk.chunkNumber >= (this.numberOfChunks - 1)) { | ||
self.executing = false; | ||
last = true; | ||
} | ||
var data = gstore.currentChunk.readSlice(toRead); | ||
if(data != null) { | ||
var data = gstore.currentChunk.readSlice(gstore.currentChunk.length()); | ||
if(data != null && gstore.currentChunk.chunkNumber == self.currentChunkNumber) { | ||
self.currentChunkNumber = self.currentChunkNumber + 1; | ||
self.completedLength += data.length; | ||
@@ -129,3 +130,3 @@ self.pendingChunk = null; | ||
gstore.currentChunk = self.pendingChunk; | ||
self._execute(); | ||
self._execute(); | ||
}); | ||
@@ -169,16 +170,10 @@ } | ||
} | ||
this.paused = false; | ||
var self = this; | ||
if(self.pendingChunk != null) { | ||
self.currentChunk = self.pendingChunk; | ||
process.nextTick(function() { | ||
self._execute(); | ||
}); | ||
} else { | ||
self.readable = false; | ||
self.emit("close"); | ||
} | ||
process.nextTick(function() { | ||
self._execute(); | ||
}); | ||
}; | ||
exports.ReadStream = ReadStream; |
@@ -19,4 +19,6 @@ try { | ||
, 'collection' | ||
, 'connection/read_preference' | ||
, 'connection/connection' | ||
, 'connection/server' | ||
, 'connection/mongos' | ||
, 'connection/repl_set' | ||
@@ -67,4 +69,6 @@ , 'cursor' | ||
, 'collection' | ||
, 'connection/read_preference' | ||
, 'connection/connection' | ||
, 'connection/server' | ||
, 'connection/mongos' | ||
, 'connection/repl_set' | ||
@@ -119,4 +123,6 @@ , 'cursor' | ||
, 'collection' | ||
, 'connection/read_preference' | ||
, 'connection/connection' | ||
, 'connection/server' | ||
, 'connection/mongos' | ||
, 'connection/repl_set' | ||
@@ -123,0 +129,0 @@ , 'cursor' |
@@ -84,6 +84,10 @@ var Long = require('bson').Long; | ||
} else { | ||
// Parse documents | ||
_self.index = bson.deserializeStream(binary_reply, _self.index, _batchSize, _self.documents, object_index); | ||
// Adjust index | ||
object_index = object_index + _batchSize; | ||
try { | ||
// Parse documents | ||
_self.index = bson.deserializeStream(binary_reply, _self.index, _batchSize, _self.documents, object_index); | ||
// Adjust index | ||
object_index = object_index + _batchSize; | ||
} catch (err) { | ||
return callback(err); | ||
} | ||
} | ||
@@ -102,19 +106,24 @@ | ||
}(this, binary_reply, batchSize, this.numberReturned)(); | ||
} else { | ||
// Let's unpack all the bson documents, deserialize them and store them | ||
for(var object_index = 0; object_index < this.numberReturned; object_index++) { | ||
// Read the size of the bson object | ||
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
// If we are storing the raw responses to pipe straight through | ||
if(raw) { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize)); | ||
} else { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(bson.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize))); | ||
} | ||
// Adjust binary index to point to next block of binary bson data | ||
this.index = this.index + bsonObjectSize; | ||
} | ||
} else { | ||
try { | ||
// Let's unpack all the bson documents, deserialize them and store them | ||
for(var object_index = 0; object_index < this.numberReturned; object_index++) { | ||
// Read the size of the bson object | ||
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
// If we are storing the raw responses to pipe straight through | ||
if(raw) { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize)); | ||
} else { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(bson.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize))); | ||
} | ||
// Adjust binary index to point to next block of binary bson data | ||
this.index = this.index + bsonObjectSize; | ||
} | ||
} catch(err) { | ||
return callback(err); | ||
} | ||
// No error return | ||
callback(null); | ||
@@ -121,0 +130,0 @@ } |
{ "name" : "mongodb" | ||
, "description" : "A node.js driver for MongoDB" | ||
, "keywords" : ["mongodb", "mongo", "driver", "db"] | ||
, "version" : "1.0.2" | ||
, "version" : "1.1.0-beta" | ||
, "author" : "Christian Amor Kvalheim <christkv@gmail.com>" | ||
@@ -61,3 +61,3 @@ , "contributors" : [ "Aaron Heckmann", | ||
, "dependencies" : { | ||
"bson": "0.0.6" | ||
"bson": "0.1.0" | ||
} | ||
@@ -68,3 +68,3 @@ , "devDependencies": { | ||
, "ejs": "0.6.1" | ||
, "nodeunit": "0.7.3" | ||
, "nodeunit": "0.7.4" | ||
, "github3": ">=0.3.0" | ||
@@ -74,2 +74,3 @@ , "markdown": "0.3.1" | ||
, "step": "0.0.5" | ||
, "async": "0.1.22" | ||
} | ||
@@ -76,0 +77,0 @@ , "config": { "native" : false } |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Non-existent author
Supply chain riskThe package was published by an npm account that no longer exists.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
No License Found
License(Experimental) License information could not be found.
Found 1 instance in 1 package
2486713
0
10584
0
7
9
38
2
1
+ Addedbson@0.1.0(transitive)
- Removedbson@0.0.6(transitive)
Updatedbson@0.1.0