Socket
Socket
Sign inDemoInstall

mongodb

Package Overview
Dependencies
Maintainers
1
Versions
562
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongodb - npm Package Compare versions

Comparing version 1.0.2 to 1.1.0-beta

lib/mongodb/connection/mongos.js

199

lib/mongodb/admin.js

@@ -15,5 +15,4 @@ /*!

*/
function Admin(db) {
function Admin(db) {
if(!(this instanceof Admin)) return new Admin(db);
this.db = db;

@@ -25,3 +24,3 @@ };

* instance of the db client
*
*
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -38,3 +37,3 @@ * @return {null} Returns no result

* instance of the db client
*
*
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -45,5 +44,3 @@ * @return {null} Returns no result

Admin.prototype.serverInfo = function(callback) {
var self = this;
var command = {buildinfo:1};
this.command(command, function(err, doc) {
this.db.executeDbAdminCommand({buildinfo:1}, function(err, doc) {
if(err != null) return callback(err, null);

@@ -64,11 +61,8 @@ return callback(null, doc.documents[0]);

this.command({serverStatus: 1}, function(err, result) {
if (err == null && result.documents[0].ok == 1) {
callback(null, result.documents[0]);
this.db.executeDbAdminCommand({serverStatus: 1}, function(err, doc) {
if(err == null && doc.documents[0].ok === 1) {
callback(null, doc.documents[0]);
} else {
if (err) {
callback(err, false);
} else {
callback(self.wrap(result.documents[0]), false);
}
if(err) return callback(err, false);
return callback(self.wrap(doc.documents[0]), false);
}

@@ -80,3 +74,3 @@ });

* Retrieve the current profiling Level for MongoDB
*
*
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -88,18 +82,12 @@ * @return {null} Returns no result

var self = this;
var command = {profile:-1};
this.command(command, function(err, doc) {
this.db.executeDbAdminCommand({profile:-1}, function(err, doc) {
doc = doc.documents[0];
if(err == null && (doc.ok == 1 || typeof doc.was === 'number')) {
if(err == null && doc.ok === 1) {
var was = doc.was;
if(was == 0) {
callback(null, "off");
} else if(was == 1) {
callback(null, "slow_only");
} else if(was == 2) {
callback(null, "all");
} else {
callback(new Error("Error: illegal profiling level value " + was), null);
}
if(was == 0) return callback(null, "off");
if(was == 1) return callback(null, "slow_only");
if(was == 2) return callback(null, "all");
return callback(new Error("Error: illegal profiling level value " + was), null);
} else {

@@ -114,3 +102,2 @@ err != null ? callback(err, null) : callback(new Error("Error with profile command"), null);

*
* @param {Object} [options] Optional parameters to the command.
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -124,11 +111,4 @@ * @return {null} Returns no result

callback = args.pop();
options = args.length ? args.shift() : {};
// Set self
var self = this;
var databaseName = this.db.databaseName;
this.db.databaseName = 'admin';
this.db.executeDbCommand({ping:1}, options, function(err, result) {
self.db.databaseName = databaseName;
return callback(err, result);
})
this.db.executeDbAdminCommand({ping: 1}, callback);
}

@@ -138,3 +118,3 @@

* Authenticate against MongoDB
*
*
* @param {String} username The user name for the authentication.

@@ -147,8 +127,4 @@ * @param {String} password The password for the authentication.

Admin.prototype.authenticate = function(username, password, callback) {
var self = this;
var databaseName = this.db.databaseName;
this.db.databaseName = 'admin';
this.db.authenticate(username, password, function(err, result) {
self.db.databaseName = databaseName;
return callback(err, result);
this.db.authenticate(username, password, {authdb: 'admin'}, function(err, doc) {
return callback(err, doc);
})

@@ -160,3 +136,3 @@ }

*
* @param {Object} [options] Optional parameters to the command.
* @param {Object} [options] Optional parameters to the command.
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -167,10 +143,5 @@ * @return {null} Returns no result

Admin.prototype.logout = function(callback) {
var self = this;
var databaseName = this.db.databaseName;
this.db.databaseName = 'admin';
this.db.logout(function(err, result) {
return callback(err, result);
})
self.db.databaseName = databaseName;
this.db.logout({authdb: 'admin'}, function(err, doc) {
return callback(err, doc);
})
}

@@ -193,3 +164,2 @@

Admin.prototype.addUser = function(username, password, options, callback) {
var self = this;
var args = Array.prototype.slice.call(arguments, 2);

@@ -199,9 +169,7 @@ callback = args.pop();

var self = this;
var databaseName = this.db.databaseName;
this.db.databaseName = 'admin';
this.db.addUser(username, password, options, function(err, result) {
self.db.databaseName = databaseName;
return callback(err, result);
})
options.dbName = 'admin';
// Add user
this.db.addUser(username, password, options, function(err, doc) {
return callback(err, doc);
})
}

@@ -227,9 +195,7 @@

var self = this;
var databaseName = this.db.databaseName;
this.db.databaseName = 'admin';
this.db.removeUser(username, options, function(err, result) {
self.db.databaseName = databaseName;
return callback(err, result);
})
options.dbName = 'admin';
// Remove the user
this.db.removeUser(username, options, function(err, doc) {
return callback(err, doc);
})
}

@@ -239,3 +205,3 @@

* Set the current profiling level of MongoDB
*
*
* @param {String} level The new profiling level (off, slow_only, all)

@@ -262,12 +228,10 @@ * @param {Function} callback Callback function of format `function(err, result) {}`.

// Set up the profile number
command['profile'] = profile;
// Execute the command to set the profiling level
this.command(command, function(err, doc) {
command['profile'] = profile;
this.db.executeDbAdminCommand(command, function(err, doc) {
doc = doc.documents[0];
if(err == null && (doc.ok == 1 || typeof doc.was === 'number')) {
if(err == null && doc.ok === 1)
return callback(null, level);
} else {
return err != null ? callback(err, null) : callback(new Error("Error with profile command"), null);
}
return err != null ? callback(err, null) : callback(new Error("Error with profile command"), null);
});

@@ -278,3 +242,3 @@ };

* Retrive the current profiling information for MongoDB
*
*
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -285,15 +249,11 @@ * @return {null} Returns no result

Admin.prototype.profilingInfo = function(callback) {
var self = this;
var databaseName = this.db.databaseName;
this.db.databaseName = 'admin';
try {
new Cursor(this.db, new Collection(this.db, DbCommand.SYSTEM_PROFILE_COLLECTION), {}).toArray(function(err, items) {
return callback(err, items);
});
new Cursor(this.db, new Collection(this.db, DbCommand.SYSTEM_PROFILE_COLLECTION), {}, null, null, null
, null, null, null, null, null, null, null, null, null, null
, null, null, null, null, null, null, null, null, 'admin').toArray(function(err, items) {
return callback(err, items);
});
} catch (err) {
return callback(err, null);
}
self.db.databaseName = databaseName;
};

@@ -303,5 +263,5 @@

* Execute a db command against the Admin database
*
*
* @param {Object} command A command object `{ping:1}`.
* @param {Object} [options] Optional parameters to the command.
* @param {Object} [options] Optional parameters to the command.
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -318,5 +278,5 @@ * @return {null} Returns no result

// Execute a command
this.db.executeDbAdminCommand(command, options, function(err, result) {
this.db.executeDbAdminCommand(command, options, function(err, doc) {
// Ensure change before event loop executes
return callback != null ? callback(err, result) : null;
return callback != null ? callback(err, doc) : null;
});

@@ -327,5 +287,5 @@ }

* Validate an existing collection
*
*
* @param {String} collectionName The name of the collection to validate.
* @param {Object} [options] Optional parameters to the command.
* @param {Object} [options] Optional parameters to the command.
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -343,3 +303,3 @@ * @return {null} Returns no result

var keys = Object.keys(options);
// Decorate command with extra options

@@ -353,16 +313,15 @@ for(var i = 0; i < keys.length; i++) {

this.db.executeDbCommand(command, function(err, doc) {
if(err != null) return callback(err, null);
if(err != null) return callback(err, null);
doc = doc.documents[0];
if(doc.ok == 0) {
if(doc.ok === 0)
return callback(new Error("Error with validate command"), null);
} else if(doc.result != null && doc.result.constructor != String) {
if(doc.result != null && doc.result.constructor != String)
return callback(new Error("Error with validation data"), null);
} else if(doc.result != null && doc.result.match(/exception|corrupt/) != null) {
if(doc.result != null && doc.result.match(/exception|corrupt/) != null)
return callback(new Error("Error: invalid collection " + collectionName), null);
} else if(doc.valid != null && !doc.valid) {
return callback(new Error("Error: invalid collection " + collectionName), null);
} else {
return callback(null, doc);
}
if(doc.valid != null && !doc.valid)
return callback(new Error("Error: invalid collection " + collectionName), null);
return callback(null, doc);
});

@@ -373,3 +332,3 @@ };

* List the available databases
*
*
* @param {Function} callback Callback function of format `function(err, result) {}`.

@@ -381,9 +340,6 @@ * @return {null} Returns no result

// Execute the listAllDatabases command
this.db.executeDbAdminCommand({listDatabases:1}, {}, function(err, result) {
if(err != null) {
callback(err, null);
} else {
callback(null, result.documents[0]);
}
});
this.db.executeDbAdminCommand({listDatabases:1}, {}, function(err, doc) {
if(err != null) return callback(err, null);
return callback(null, doc.documents[0]);
});
}

@@ -401,12 +357,7 @@

this.command({replSetGetStatus:1}, function(err, result) {
if (err == null && result.documents[0].ok == 1) {
callback(null, result.documents[0]);
} else {
if (err) {
callback(err, false);
} else {
callback(self.db.wrap(result.documents[0]), false);
}
}
this.db.executeDbAdminCommand({replSetGetStatus:1}, function(err, doc) {
if(err == null && doc.documents[0].ok === 1)
return callback(null, doc.documents[0]);
if(err) return callback(err, false);
return callback(self.db.wrap(doc.documents[0]), false);
});

@@ -413,0 +364,0 @@ };

@@ -45,3 +45,3 @@ /**

if(!(this instanceof Collection)) return new Collection(db, collectionName, pkFactory, options);
checkCollectionName(collectionName);

@@ -56,6 +56,10 @@

this.raw = options == null || options.raw == null ? db.raw : options.raw;
this.readPreference = options == null || options.readPreference == null ? db.serverConfig.readPreference : options.readPreference;
this.readPreference = this.readPreference == null ? 'primary' : this.readPreference;
this.pkFactory = pkFactory == null
? ObjectID
: pkFactory;
var self = this;

@@ -82,3 +86,3 @@ }

if(!('function' === typeof callback)) callback = null;
var self = this;
var self = this;
insertAll(self, Array.isArray(docs) ? docs : [docs], options, callback);

@@ -131,6 +135,6 @@ return this;

}
// Ensure options
if(options == null) options = {};
if(!('function' === typeof callback)) callback = null;
if(!('function' === typeof callback)) callback = null;
// Ensure we have at least an empty selector

@@ -140,7 +144,14 @@ selector = selector == null ? {} : selector;

var flags = 0 | (options.single ? 1 : 0);
// DbName
var dbName = options['dbName'];
// If no dbname defined use the db one
if(dbName == null) {
dbName = this.db.databaseName;
}
// Create a delete command
var deleteCommand = new DeleteCommand(
this.db
, this.db.databaseName + "." + this.collectionName
, dbName + "." + this.collectionName
, selector

@@ -175,3 +186,3 @@ , flags);

error = error && error.documents;
if(!callback) return;
if(!callback) return;

@@ -184,6 +195,6 @@ if(err) {

callback(null, error[0].n);
}
});
}
});
} else {
var result = this.db._executeRemoveCommand(deleteCommand);
var result = this.db._executeRemoveCommand(deleteCommand);
// If no callback just return

@@ -214,3 +225,3 @@ if (!callback) return;

self.db._executeQueryCommand(DbCommand.createRenameCollectionCommand(self.db, self.collectionName, newName), function(err, result) {
if(err == null && result.documents[0].ok == 1) {
if(err == null && result.documents[0].ok == 1) {
if(callback != null) {

@@ -234,3 +245,3 @@ // Set current object to point to the new name

var insertAll = function insertAll (self, docs, options, callback) {
if('function' === typeof options) callback = options, options = {};
if('function' === typeof options) callback = options, options = {};
if(options == null) options = {};

@@ -245,3 +256,10 @@ if(!('function' === typeof callback)) callback = null;

}
// DbName
var dbName = options['dbName'];
// If no dbname defined use the db one
if(dbName == null) {
dbName = self.db.databaseName;
}
// Either use override on the function, or go back to default on either the collection

@@ -254,7 +272,7 @@ // level or db

}
// Pass in options
var insertCommand = new InsertCommand(
self.db
, self.db.databaseName + "." + self.collectionName, true, insertFlags);
, dbName + "." + self.collectionName, true, insertFlags);

@@ -264,3 +282,3 @@ // Add the documents and decorate them with id's if they have none

var doc = docs[index];
// Add id to each document if it's not already defined

@@ -273,3 +291,3 @@ if (!(Buffer.isBuffer(doc)) && doc['_id'] == null && self.db.forceServerObjectId != true) {

}
// Collect errorOptions

@@ -282,5 +300,5 @@ var errorOptions = options.safe != null ? options.safe : null;

if(errorOptions && errorOptions['safe'] != false && typeof callback !== 'function') throw new Error("safe cannot be used without a callback");
// Default command options
var commandOptions = {};
var commandOptions = {};
// If safe is defined check for error message

@@ -292,3 +310,3 @@ if(errorOptions && errorOptions != false) {

if(errorOptions == null) commandOptions['async'] = true;
// Set safe option

@@ -303,7 +321,7 @@ commandOptions['safe'] = errorOptions;

}
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection)
self.db._executeInsertCommand(insertCommand, commandOptions, function (err, error) {
error = error && error.documents;
if(!callback) return;
if(!callback) return;

@@ -316,6 +334,6 @@ if (err) {

callback(null, docs);
}
});
} else {
var result = self.db._executeInsertCommand(insertCommand, commandOptions);
}
});
} else {
var result = self.db._executeInsertCommand(insertCommand, commandOptions);
// If no callback just return

@@ -350,3 +368,3 @@ if(!callback) return;

var errorOptions = options.safe != null ? options.safe : false;
var errorOptions = options.safe != null ? options.safe : false;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;

@@ -392,2 +410,9 @@ // Extract the id, if we have one we need to do a update command

// DbName
var dbName = options['dbName'];
// If no dbname defined use the db one
if(dbName == null) {
dbName = this.db.databaseName;
}
// Either use override on the function, or go back to default on either the collection

@@ -399,7 +424,7 @@ // level or db

options['serializeFunctions'] = this.serializeFunctions;
}
}
var updateCommand = new UpdateCommand(
this.db
, this.db.databaseName + "." + this.collectionName
, dbName + "." + this.collectionName
, selector

@@ -411,3 +436,3 @@ , document

// Unpack the error options if any
var errorOptions = (options && options.safe != null) ? options.safe : null;
var errorOptions = (options && options.safe != null) ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;

@@ -418,5 +443,5 @@ errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions;

if(errorOptions && errorOptions['safe'] != false && typeof callback !== 'function') throw new Error("safe cannot be used without a callback");
// If we are executing in strict mode or safe both the update and the safe command must happen on the same line
if(errorOptions && errorOptions != false) {
if(errorOptions && errorOptions != false) {
// Insert options

@@ -439,3 +464,3 @@ var commandOptions = {read:false};

error = error && error.documents;
if(!callback) return;
if(!callback) return;

@@ -449,7 +474,7 @@ if(err) {

callback(null, error[0].n, error[0]);
}
});
}
});
} else {
// Execute update
var result = this.db._executeUpdateCommand(updateCommand);
var result = this.db._executeUpdateCommand(updateCommand);
// If no callback just return

@@ -467,6 +492,10 @@ if (!callback) return;

/**
* The distinct command returns returns a list of distinct values for the given key across a collection.
* The distinct command returns returns a list of distinct values for the given key across a collection.
*
* Options
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
*
* @param {String} key key to run distinct against.
* @param {Object} [query] option query to narrow the returned objects.
* @param {Object} [options] additional options during update.
* @param {Function} callback must be provided.

@@ -476,22 +505,24 @@ * @return {null}

*/
Collection.prototype.distinct = function distinct(key, query, callback) {
if ('function' === typeof query) callback = query, query = {};
Collection.prototype.distinct = function distinct(key, query, options, callback) {
var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
query = args.length ? args.shift() : {};
options = args.length ? args.shift() : {};
var mapCommandHash = {
distinct: this.collectionName
, query: query
, key: key
'distinct': this.collectionName
, 'query': query
, 'key': key
};
// Set read preference if we set one
var readPreference = options['readPreference'] ? options['readPreference'] : false;
// Create the command
var cmd = DbCommand.createDbSlaveOkCommand(this.db, mapCommandHash);
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) {
if (err) {
this.db._executeQueryCommand(cmd, {read:readPreference}, function (err, result) {
if(err)
return callback(err);
}
if (result.documents[0].ok != 1) {
if(result.documents[0].ok != 1)
return callback(new Error(result.documents[0].errmsg));
}
callback(null, result.documents[0].values);

@@ -504,3 +535,7 @@ });

*
* Options
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
*
* @param {Object} [query] query to filter by before performing count.
* @param {Object} [options] additional options during update.
* @param {Function} callback must be provided.

@@ -510,11 +545,19 @@ * @return {null}

*/
Collection.prototype.count = function count (query, callback) {
if ('function' === typeof query) callback = query, query = {};
Collection.prototype.count = function count (query, options, callback) {
var args = Array.prototype.slice.call(arguments, 0);
callback = args.pop();
query = args.length ? args.shift() : {};
options = args.length ? args.shift() : {};
// Final query
var final_query = {
count: this.collectionName
, query: query
, fields: null
'count': this.collectionName
, 'query': query
, 'fields': null
};
// Set read preference if we set one
var readPreference = options['readPreference'] ? options['readPreference'] : false;
// Set up query options
var queryOptions = QueryCommand.OPTS_NO_CURSOR_TIMEOUT;

@@ -536,13 +579,9 @@ if (this.slaveOk || this.db.slaveOk) {

var self = this;
this.db._executeQueryCommand(queryCommand, {read:true}, function (err, result) {
this.db._executeQueryCommand(queryCommand, {read:readPreference}, function (err, result) {
result = result && result.documents;
if(!callback) return;
if(!callback) return;
if (err) {
callback(err);
} else if (result[0].ok != 1 || result[0].errmsg) {
callback(self.db.wrap(result[0]));
} else {
callback(null, result[0].n);
}
if(err) return callback(err);
if (result[0].ok != 1 || result[0].errmsg) return callback(self.db.wrap(result[0]));
callback(null, result[0].n);
});

@@ -613,5 +652,5 @@ };

}
// Unpack the error options if any
var errorOptions = (options && options.safe != null) ? options.safe : null;
var errorOptions = (options && options.safe != null) ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;

@@ -623,3 +662,3 @@ errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions;

// Add the find and modify command
commands.push(DbCommand.createDbSlaveOkCommand(this.db, queryObject, options));
commands.push(DbCommand.createDbCommand(this.db, queryObject, options));
// If we have safe defined we need to return both call results

@@ -631,18 +670,14 @@ var chainedCommands = errorOptions != null ? true : false;

}
// Fire commands and
this.db._executeQueryCommand(commands, function(err, result) {
// Fire commands and
this.db._executeQueryCommand(commands, {read:false}, function(err, result) {
result = result && result.documents;
if(err != null) {
callback(err);
} else if(result[0].err != null) {
callback(self.db.wrap(result[0]), null);
} else if(result[0].errmsg != null && !result[0].errmsg.match(eErrorMessages)) {
// Workaround due to 1.8.X returning an error on no matching object
// while 2.0.X does not not, making 2.0.X behaviour standard
callback(self.db.wrap(result[0]), null, result[0]);
} else {
return callback(null, result[0].value, result[0]);
}
if(err != null) return callback(err);
if(result[0].err != null) return callback(self.db.wrap(result[0]), null);
// Workaround due to 1.8.X returning an error on no matching object
// while 2.0.X does not not, making 2.0.X behaviour standard
if(result[0].errmsg != null && !result[0].errmsg.match(eErrorMessages))
return callback(self.db.wrap(result[0]), null, result[0]);
return callback(null, result[0].value, result[0]);
});

@@ -668,7 +703,7 @@ }

sort = args.length ? args.shift() : [];
options = args.length ? args.shift() : {};
options = args.length ? args.shift() : {};
// Add the remove option
options['remove'] = true;
// Execute the callback
this.findAndModify(query, sort, null, options, callback);
this.findAndModify(query, sort, null, options, callback);
}

@@ -678,3 +713,3 @@

, 'timeout' : 1, 'tailable' : 1, 'batchSize' : 1, 'raw' : 1, 'read' : 1
, 'returnKey' : 1, 'maxScan' : 1, 'min' : 1, 'max' : 1, 'showDiskLoc' : 1, 'comment' : 1};
, 'returnKey' : 1, 'maxScan' : 1, 'min' : 1, 'max' : 1, 'showDiskLoc' : 1, 'comment' : 1, 'dbName' : 1};

@@ -703,2 +738,3 @@ /**

* - **tailable** {Boolean, default:false}, specify if the cursor is tailable.
* - **awaitdata** {Boolean, default:false} allow the cursor to wait for data, only applicable for tailable cursor.
* - **batchSize** {Number, default:0}, set the batchSize for the getMoreCommand when iterating over the query results.

@@ -712,3 +748,4 @@ * - **returnKey** {Boolean, default:false}, only return the index key.

* - **raw** {Boolean, default:false}, Return all BSON documents as Raw Buffer documents.
* - **read** {Boolean, default:false}, Tell the query to read from a secondary server.
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
* - **numberOfRetries** {Number, default:1}, if using awaidata specifies the number of times to retry on timeout.
*

@@ -736,7 +773,7 @@ * @param {Object} query query object to locate the object to modify

}
if(len === 2 && !Array.isArray(fields)) {
var fieldKeys = Object.getOwnPropertyNames(fields);
var is_option = false;
for(var i = 0; i < fieldKeys.length; i++) {

@@ -748,3 +785,3 @@ if(testForFields[fieldKeys[i]] != null) {

}
if(is_option) {

@@ -775,3 +812,3 @@ options = fields;

if(Buffer.isBuffer(object)) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {

@@ -783,7 +820,7 @@ var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");

}
// Validate correctness of the field selector
var object = fields;
if(Buffer.isBuffer(object)) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {

@@ -794,4 +831,4 @@ var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");

}
}
}
// Check special case where we are using an objectId

@@ -828,3 +865,11 @@ if(selector instanceof ObjectID) {

options.slaveOk = options.slaveOk != null ? options.slaveOk : this.db.slaveOk;
// Set option
var o = options;
// Support read/readPreference
if(o["read"] != null) o["readPreference"] = o["read"];
// Set the read preference
o.read = o["readPreference"] ? o.readPreference : this.readPreference;
// Adjust slave ok if read preference is secondary or secondary only
if(o.read == "secondary" || o.read == "secondaryOnly") options.slaveOk = true;

@@ -836,7 +881,9 @@ // callback for backward compatibility

, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment));
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment, o.awaitdata
, o.numberOfRetries, o.dbName));
} else {
return new Cursor(this.db, this, selector, fields, o.skip, o.limit
, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment);
, o.slaveOk, o.raw, o.read, o.returnKey, o.maxScan, o.min, o.max, o.showDiskLoc, o.comment, o.awaitdata
, o.numberOfRetries, o.dbName);
}

@@ -909,3 +956,3 @@ };

* - **raw** {Boolean, default:false}, Return all BSON documents as Raw Buffer documents.
* - **read** {Boolean, default:false}, Tell the query to read from a secondary server.
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
*

@@ -926,4 +973,4 @@ * @param {Object} query query object to locate the object to modify

if(err != null) return callback(err instanceof Error ? err : self.db.wrap(new Error(err)), null);
if(items.length == 1) return callback(null, items[0]);
callback(null, null);
if(items.length == 1) return callback(null, items[0]);
callback(null, null);
});

@@ -936,3 +983,3 @@ };

* Options
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a
* - **unique** {Boolean, default:false}, creates an unique index.

@@ -963,3 +1010,3 @@ * - **sparse** {Boolean, default:false}, creates a sparse index.

errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions;
// If we have a write concern set and no callback throw error

@@ -976,3 +1023,3 @@ if(errorOptions != null && errorOptions != false && (typeof callback !== 'function' && typeof options !== 'function')) throw new Error("safe cannot be used without a callback");

* Options
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a
* - **safe** {true | {w:n, wtimeout:n} | {fsync:true}, default:false}, executes with a
* - **unique** {Boolean, default:false}, creates an unique index.

@@ -1002,3 +1049,3 @@ * - **sparse** {Boolean, default:false}, creates a sparse index.

}
// Collect errorOptions

@@ -1008,6 +1055,6 @@ var errorOptions = options.safe != null ? options.safe : null;

errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions;
// If we have a write concern set and no callback throw error
if(errorOptions != null && errorOptions != false && (typeof callback !== 'function' && typeof options !== 'function')) throw new Error("safe cannot be used without a callback");
// Execute create index

@@ -1084,3 +1131,3 @@ this.db.ensureIndex(this.collectionName, fieldOrSpec, options, callback);

* @return {null}
* @api public
* @api public
**/

@@ -1141,7 +1188,7 @@ Collection.prototype.reIndex = function(callback) {

}
var self = this;
var cmd = DbCommand.createDbSlaveOkCommand(this.db, mapCommandHash);
var cmd = DbCommand.createDbCommand(this.db, mapCommandHash);
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) {
this.db._executeQueryCommand(cmd, {read:false}, function (err, result) {
if (err) {

@@ -1151,3 +1198,3 @@ return callback(err);

//
//
if (1 != result.documents[0].ok || result.documents[0].err || result.documents[0].errmsg) {

@@ -1164,21 +1211,25 @@ return callback(self.db.wrap(result.documents[0]));

// invoked with inline?
if (result.documents[0].results) {
if(result.documents[0].results) {
return callback(null, result.documents[0].results, stats);
}
// Create a collection object that wraps the result collection
self.db.collection(result.documents[0].result, function (err, collection) {
// If we wish for no verbosity
if(options['verbose'] == null || !options['verbose']) {
return callback(err, collection);
}
// The returned collection
var collection = null;
// Create statistics value
var stats = {};
if(result.documents[0].timeMillis) stats['processtime'] = result.documents[0].timeMillis;
if(result.documents[0].counts) stats['counts'] = result.documents[0].counts;
if(result.documents[0].timing) stats['timing'] = result.documents[0].timing;
// Return stats as third set of values
callback(err, collection, stats);
});
// If we have an object it's a different db
if(result.documents[0].result != null && typeof result.documents[0].result == 'object') {
var doc = result.documents[0].result;
collection = self.db.db(doc.db).collection(doc.collection);
} else {
// Create a collection object that wraps the result collection
collection = self.db.collection(result.documents[0].result)
}
// If we wish for no verbosity
if(options['verbose'] == null || !options['verbose']) {
return callback(err, collection);
}
// Return stats as third set of values
callback(err, collection, stats);
});

@@ -1221,9 +1272,13 @@ };

* Run a group command across a collection
*
* Options
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
*
* @param {Object|Array|Function|Code} keys an object, array or function expressing the keys to group by.
* @param {Object} condition an optional condition that must be true for a row to be considered.
* @param {Object} initial initial value of the aggregation counter object.
* @param {Object} initial initial value of the aggregation counter object.
* @param {Function|Code} reduce the reduce function aggregates (reduces) the objects iterated
* @param {Function|Code} finalize an optional function to be run on each item in the result set just before the item is returned.
* @param {Boolean} command specify if you wish to run using the internal group command or using eval, default is true.
* @param {Object} [options] additional options during update.
* @param {Function} callback returns the results.

@@ -1233,3 +1288,3 @@ * @return {null}

*/
Collection.prototype.group = function group(keys, condition, initial, reduce, finalize, command, callback) {
Collection.prototype.group = function group(keys, condition, initial, reduce, finalize, command, options, callback) {
var args = Array.prototype.slice.call(arguments, 3);

@@ -1241,2 +1296,3 @@ callback = args.pop();

command = args.length ? args.shift() : null;
options = args.length ? args.shift() : {};

@@ -1249,17 +1305,17 @@ // Make sure we are backward compatible

if (!Array.isArray(keys) && keys instanceof Object && typeof(keys) !== 'function') {
if (!Array.isArray(keys) && keys instanceof Object && typeof(keys) !== 'function' && !(keys instanceof Code)) {
keys = Object.keys(keys);
}
if(typeof reduce === 'function') {
reduce = reduce.toString();
}
if(typeof finalize === 'function') {
finalize = finalize.toString();
}
// Set up the command as default
command = command == null ? true : command;
// Execute using the command

@@ -1278,9 +1334,9 @@ if(command) {

, 'out': "inline"
}
}
};
// if finalize is defined
if(finalize != null) selector.group['finalize'] = finalize;
// Set up group selector
if ('function' === typeof keys) {
if ('function' === typeof keys || keys instanceof Code) {
selector.group.$keyf = keys instanceof Code

@@ -1298,6 +1354,8 @@ ? keys

var cmd = DbCommand.createDbSlaveOkCommand(this.db, selector);
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) {
// Set read preference if we set one
var readPreference = options['readPreference'] ? options['readPreference'] : false;
this.db._executeQueryCommand(cmd, {read:readPreference}, function (err, result) {
if(err != null) return callback(err);
var document = result.documents[0];

@@ -1321,7 +1379,7 @@ if (null == document.retval) {

scope.initial = initial;
// Pass in the function text to execute within mongodb.
var groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';');
this.db.eval(new Code(groupfn, scope), function (err, results) {
this.db.eval(new Code(groupfn, scope), function (err, results) {
if (err) return callback(err, null);

@@ -1361,3 +1419,3 @@ callback(null, results.result || results);

} else {
callback(null, document.capped);
callback(null, document && document.capped);
}

@@ -1386,3 +1444,3 @@ });

}
// All keys found return true

@@ -1393,3 +1451,3 @@ return callback(null, true);

}
});
});
}

@@ -1408,2 +1466,3 @@

* - **includeLocs** {Boolean, default:false}, include the location data fields in the top level of the results MongoDB > 2.X.
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
*

@@ -1428,3 +1487,3 @@ * @param {Number} x point to search on the x axis, ensure the indexes are ordered in the same order.

}
// Decorate object if any with known properties

@@ -1440,3 +1499,3 @@ if(options['num'] != null) commandObject['num'] = options['num'];

// Execute the command
this.db.command(commandObject, callback);
this.db.command(commandObject, options, callback);
}

@@ -1451,2 +1510,3 @@

* - **limit** {Number}, max number of results to return.
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
*

@@ -1471,3 +1531,3 @@ * @param {Number} x point to search on the x axis, ensure the indexes are ordered in the same order.

}
// Decorate object if any with known properties

@@ -1480,3 +1540,3 @@ if(options['maxDistance'] != null) commandObject['maxDistance'] = options['maxDistance'];

// Execute the command
this.db.command(commandObject, callback);
this.db.command(commandObject, options, callback);
}

@@ -1499,3 +1559,8 @@

*
* @param {Array|Objects} pipline a pipleline containing all the object for the execution.
* Options
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
* - **explain** {Boolean}, return the query plan for the aggregation pipeline instead of the results.
*
* @param {Array} array containing all the aggregation framework commands for the execution.
* @param {Object} [options] additional options during update.
* @param {Function} callback returns matching documents.

@@ -1505,8 +1570,11 @@ * @return {null}

*/
Collection.prototype.aggregate = function(pipeline, callback) {
var args = Array.prototype.slice.call(arguments, 0);
Collection.prototype.aggregate = function(pipeline, options, callback) {
var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
options = args.length ? args.shift() : {};
var self = this;
// Check if we have more than one argument then just make the pipeline
if(!Array.isArray(pipeline)) return callback(new Error("pipline must be an array"));
// Check if we have more than one argument then just make the pipeline
// the remaining arguments

@@ -1516,7 +1584,14 @@ if(args.length > 1) {

}
// Build the command
var command = { aggregate : this.collectionName, pipeline : pipeline};
// Add all options
var keys = Object.keys(options);
// Add all options
for(var i = 0; i < keys.length; i++) {
command[keys[i]] = options[keys[i]];
}
// Execute the command
this.db.command(command, function(err, result) {
this.db.command(command, options, function(err, result) {
if(err) {

@@ -1526,5 +1601,7 @@ callback(err);

callback(self.db.wrap(result));
} else if(typeof result == 'object' && result['serverPipeline']) {
callback(null, result);
} else {
callback(null, result.result);
}
}
});

@@ -1538,2 +1615,3 @@ }

* - **scale** {Number}, divide the returned sizes by scale value.
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
*

@@ -1560,3 +1638,3 @@ * @param {Objects} [options] options for the map reduce job.

// Execute the command
this.db.command(commandObject, callback);
this.db.command(commandObject, options, callback);
}

@@ -1563,0 +1641,0 @@

@@ -50,7 +50,7 @@ var QueryCommand = require('./query_command').QueryCommand,

DbCommand.createGetNonceCommand = function(db) {
DbCommand.createGetNonceCommand = function(db, options) {
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, {'getnonce':1}, null);
};
DbCommand.createAuthenticationCommand = function(db, username, password, nonce) {
DbCommand.createAuthenticationCommand = function(db, username, password, nonce, authdb) {
// Use node md5 generator

@@ -64,7 +64,7 @@ var md5 = crypto.createHash('md5');

md5.update(nonce + username + hash_password);
var key = md5.digest('hex');
var key = md5.digest('hex');
// Creat selector
var selector = {'authenticate':1, 'user':username, 'nonce':nonce, 'key':key};
// Create db command
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NONE, 0, -1, selector, null);
return new DbCommand(db, authdb + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NONE, 0, -1, selector, null);
};

@@ -102,3 +102,3 @@

}
// Final command
// Final command
var command = {'getlasterror':1};

@@ -111,3 +111,3 @@ // If we have an options Object let's merge in the fields (fsync/wtimeout/w)

}
// Execute command

@@ -131,3 +131,3 @@ return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command, null);

var keys;
// Get all the fields accordingly

@@ -162,3 +162,3 @@ if (fieldOrSpec.constructor === String) { // 'type'

}
// Generate the index name

@@ -173,3 +173,3 @@ var indexName = indexes.join("_");

options = options == null || typeof options == 'boolean' ? {} : options;
// Add all the options

@@ -181,3 +181,3 @@ var keys = Object.keys(options);

}
// If we don't have the unique property set on the selector

@@ -189,4 +189,6 @@ if(selector['unique'] == null) selector['unique'] = finalUnique;

DbCommand.logoutCommand = function(db, command_hash) {
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null);
DbCommand.logoutCommand = function(db, command_hash, options) {
var dbName = options != null && options['authdb'] != null ? options['authdb'] : db.databaseName;
// Create logout command
return new DbCommand(db, dbName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null);
}

@@ -214,4 +216,8 @@

DbCommand.createAdminDbCommandSlaveOk = function(db, command_hash) {
return new DbCommand(db, "admin." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT | QueryCommand.OPTS_SLAVE, 0, -1, command_hash, null);
};
DbCommand.createDbSlaveOkCommand = function(db, command_hash, options) {
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT | QueryCommand.OPTS_SLAVE, 0, -1, command_hash, null, options);
};

@@ -31,3 +31,3 @@ var BaseCommand = require('./base_command').BaseCommand,

}
// Make sure we don't get a null exception

@@ -40,6 +40,10 @@ options = options == null ? {} : options;

this.numberToReturn = numberToReturn;
// Ensure we have no null query
query = query == null ? {} : query;
// Wrap query in the $query parameter so we can add read preferences for mongos
this.query = query;
this.returnFieldSelector = returnFieldSelector;
this.db = db;
// Let us defined on a command basis if we want functions to be serialized or not

@@ -56,2 +60,38 @@ if(options['serializeFunctions'] != null && options['serializeFunctions']) {

/*
* Adds the read prefrence to the current command
*/
QueryCommand.prototype.setMongosReadPreference = function(readPreference, tags) {
// Force the slave ok flag to be set
this.queryOptions |= QueryCommand.OPTS_SLAVE;
// Backward compatibility, ensure $query only set on read preference so 1.8.X works
if((readPreference != null || tags != null) && this.query['$query'] == null) {
this.query = {'$query': this.query};
}
// If we have no readPreference set and no tags, check if the slaveOk bit is set
if(readPreference == null && tags == null) {
// If we have a slaveOk bit set the read preference for MongoS
if(this.queryOptions & QueryCommand.OPTS_SLAVE) {
this.query['$readPreference'] = {mode: 'secondary'}
} else {
this.query['$readPreference'] = {mode: 'primary'}
}
}
// Build read preference object
if(typeof readPreference == 'object' && readPreference['_type'] == 'ReadPreference') {
this.query['$readPreference'] = readPreference.toObject();
} else if(readPreference != null) {
// Add the read preference
this.query['$readPreference'] = {mode: readPreference};
// If we have tags let's add them
if(tags != null) {
this.query['$readPreference']['tags'] = tags;
}
}
}
/*
struct {

@@ -68,10 +108,11 @@ MsgHeader header; // standard message header

QueryCommand.prototype.toBinary = function() {
// Total length of the command
var totalLengthOfCommand = 0;
// Calculate total length of the document
if(Buffer.isBuffer(this.query)) {
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.query.length + (4 * 4);
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.query.length + (4 * 4);
} else {
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson.calculateObjectSize(this.query, this.serializeFunctions, true) + (4 * 4);
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson.calculateObjectSize(this.query, this.serializeFunctions, true) + (4 * 4);
}
// Calculate extra fields size

@@ -90,3 +131,3 @@ if(this.returnFieldSelector != null && !(Buffer.isBuffer(this.returnFieldSelector))) {

// Write the header information to the buffer
_command[_index + 3] = (totalLengthOfCommand >> 24) & 0xff;
_command[_index + 3] = (totalLengthOfCommand >> 24) & 0xff;
_command[_index + 2] = (totalLengthOfCommand >> 16) & 0xff;

@@ -98,3 +139,3 @@ _command[_index + 1] = (totalLengthOfCommand >> 8) & 0xff;

// Write the request ID
_command[_index + 3] = (this.requestId >> 24) & 0xff;
_command[_index + 3] = (this.requestId >> 24) & 0xff;
_command[_index + 2] = (this.requestId >> 16) & 0xff;

@@ -111,3 +152,3 @@ _command[_index + 1] = (this.requestId >> 8) & 0xff;

// Write the op_code for the command
_command[_index + 3] = (QueryCommand.OP_QUERY >> 24) & 0xff;
_command[_index + 3] = (QueryCommand.OP_QUERY >> 24) & 0xff;
_command[_index + 2] = (QueryCommand.OP_QUERY >> 16) & 0xff;

@@ -120,3 +161,3 @@ _command[_index + 1] = (QueryCommand.OP_QUERY >> 8) & 0xff;

// Write the query options
_command[_index + 3] = (this.queryOptions >> 24) & 0xff;
_command[_index + 3] = (this.queryOptions >> 24) & 0xff;
_command[_index + 2] = (this.queryOptions >> 16) & 0xff;

@@ -130,6 +171,6 @@ _command[_index + 1] = (this.queryOptions >> 8) & 0xff;

_index = _index + _command.write(this.collectionName, _index, 'utf8') + 1;
_command[_index - 1] = 0;
_command[_index - 1] = 0;
// Write the number of documents to skip
_command[_index + 3] = (this.numberToSkip >> 24) & 0xff;
_command[_index + 3] = (this.numberToSkip >> 24) & 0xff;
_command[_index + 2] = (this.numberToSkip >> 16) & 0xff;

@@ -142,3 +183,3 @@ _command[_index + 1] = (this.numberToSkip >> 8) & 0xff;

// Write the number of documents to return
_command[_index + 3] = (this.numberToReturn >> 24) & 0xff;
_command[_index + 3] = (this.numberToReturn >> 24) & 0xff;
_command[_index + 2] = (this.numberToReturn >> 16) & 0xff;

@@ -165,3 +206,3 @@ _command[_index + 1] = (this.numberToReturn >> 8) & 0xff;

// Write the length to the document
_command[_index + 3] = (documentLength >> 24) & 0xff;
_command[_index + 3] = (documentLength >> 24) & 0xff;
_command[_index + 2] = (documentLength >> 16) & 0xff;

@@ -173,3 +214,3 @@ _command[_index + 1] = (documentLength >> 8) & 0xff;

// Add terminating 0 for the object
_command[_index - 1] = 0;
_command[_index - 1] = 0;

@@ -181,3 +222,3 @@ // Push field selector if available

// Write the length to the document
_command[_index + 3] = (documentLength >> 24) & 0xff;
_command[_index + 3] = (documentLength >> 24) & 0xff;
_command[_index + 2] = (documentLength >> 16) & 0xff;

@@ -189,3 +230,3 @@ _command[_index + 1] = (documentLength >> 8) & 0xff;

// Add terminating 0 for the object
_command[_index - 1] = 0;
_command[_index - 1] = 0;
}

@@ -200,6 +241,6 @@ } if(this.returnFieldSelector != null && Buffer.isBuffer(this.returnFieldSelector)) {

// Copy the data into the current buffer
object.copy(_command, _index);
object.copy(_command, _index);
// Write the length to the document
_command[_index + 3] = (documentLength >> 24) & 0xff;
_command[_index + 3] = (documentLength >> 24) & 0xff;
_command[_index + 2] = (documentLength >> 16) & 0xff;

@@ -211,5 +252,5 @@ _command[_index + 1] = (documentLength >> 8) & 0xff;

// Add terminating 0 for the object
_command[_index - 1] = 0;
_command[_index - 1] = 0;
}
// Return finished command

@@ -216,0 +257,0 @@ return _command;

@@ -12,4 +12,4 @@ var utils = require('./connection_utils'),

// Set up event emitter
EventEmitter.call(this);
// Keep all options for the socket in a specific collection allowing the user to specify the
EventEmitter.call(this);
// Keep all options for the socket in a specific collection allowing the user to specify the
// Wished upon socket connection parameters

@@ -23,3 +23,3 @@ this.socketOptions = typeof socketOptions === 'object' ? socketOptions : {};

this.minPoolSize = Math.floor(this.poolSize / 2) + 1;
// Set default settings for the socket options

@@ -34,13 +34,13 @@ utils.setIntegerParameter(this.socketOptions, 'timeout', 0);

// Allows you to set a throttling bufferSize if you need to stop overflows
utils.setIntegerParameter(this.socketOptions, 'bufferSize', 0);
utils.setIntegerParameter(this.socketOptions, 'bufferSize', 0);
// Internal structures
this.openConnections = [];
this.openConnections = [];
// Assign connection id's
this.connectionId = 0;
// Current index for selection of pool connection
this.currentConnectionIndex = 0;
// The pool state
this._poolState = 'disconnected';
this._poolState = 'disconnected';
// timeout control

@@ -55,7 +55,7 @@ this._timeout = false;

maxBsonSize = Connection.DEFAULT_MAX_BSON_SIZE;
}
}
for(var i = 0; i < this.openConnections.length; i++) {
this.openConnections[i].maxBsonSize = maxBsonSize;
}
}
}

@@ -66,3 +66,2 @@

return new function() {
var connectionStatus = _self._poolState;
// Create a new connection instance

@@ -97,10 +96,8 @@ var connection = new Connection(_self.connectionId++, _self.socketOptions);

// If we are already disconnected ignore the event
if(connectionStatus != 'disconnected' && _self.listeners("error").length > 0) {
_self.emit("error", err);
if(_self._poolState != 'disconnected' && _self.listeners("error").length > 0) {
_self.emit("error", err);
}
// Set disconnected
connectionStatus = 'disconnected';
// Set disconnected
_self._poolState = 'disconnected';
_self._poolState = 'disconnected';
// Stop

@@ -113,10 +110,7 @@ _self.stop();

// If we are already disconnected ignore the event
if(connectionStatus !== 'disconnected' && _self.listeners("close").length > 0) {
_self.emit("close");
if(_self._poolState !== 'disconnected' && _self.listeners("close").length > 0) {
_self.emit("close");
}
// Set disconnected
connectionStatus = 'disconnected';
// Set disconnected
_self._poolState = 'disconnected';
_self._poolState = 'disconnected';
// Stop

@@ -129,10 +123,7 @@ _self.stop();

// If we are already disconnected ignore the event
if(connectionStatus !== 'disconnected' && _self.listeners("timeout").length > 0) {
_self.emit("timeout", err);
if(_self._poolState !== 'disconnected' && _self.listeners("timeout").length > 0) {
_self.emit("timeout", err);
}
// Set disconnected
connectionStatus = 'disconnected';
// Set disconnected
_self._poolState = 'disconnected';
_self._poolState = 'disconnected';
// Stop

@@ -145,12 +136,10 @@ _self.stop();

// If we are already disconnected ignore the event
if(connectionStatus !== 'disconnected' && _self.listeners("parseError").length > 0) {
_self.emit("parseError", new Error("parseError occured"));
if(_self._poolState !== 'disconnected' && _self.listeners("parseError").length > 0) {
_self.emit("parseError", new Error("parseError occured"));
}
// Set disconnected
connectionStatus = 'disconnected';
_self.stop();
});
connection.on("message", function(message) {
connection.on("message", function(message) {
_self.emit("message", message);

@@ -160,3 +149,3 @@ });

// Start connection in the next tick
connection.start();
connection.start();
}();

@@ -167,3 +156,3 @@ }

// Start method, will throw error if no listeners are available
// Pass in an instance of the listener that contains the api for
// Pass in an instance of the listener that contains the api for
// finding callbacks for a given message etc.

@@ -177,5 +166,5 @@ ConnectionPool.prototype.start = function() {

}
// Set pool state to connecting
this._poolState = 'connecting';
this._poolState = 'connecting';
this._timeout = false;

@@ -198,7 +187,7 @@

// Set disconnected
this._poolState = 'disconnected';
this._poolState = 'disconnected';
// Clear all listeners if specified
if(removeListeners) {
this.removeAllEventListeners();
this.removeAllEventListeners();
}

@@ -210,5 +199,5 @@

}
// Clean up
this.openConnections = [];
this.openConnections = [];
}

@@ -215,0 +204,0 @@

@@ -11,3 +11,3 @@ var utils = require('./connection_utils'),

// Set up event emitter
EventEmitter.call(this);
EventEmitter.call(this);
// Store all socket options

@@ -19,7 +19,7 @@ this.socketOptions = socketOptions ? socketOptions : {host:'localhost', port:27017};

this.connected = false;
//
// Connection parsing state
//
this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : Connection.DEFAULT_MAX_BSON_SIZE;
this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : Connection.DEFAULT_MAX_BSON_SIZE;
// Contains the current message bytes

@@ -51,5 +51,5 @@ this.buffer = null;

// Create a new stream
this.connection = new net.Socket();
// Set options on the socket
this.connection.setTimeout(this.socketOptions.timeout);
this.connection = new net.Socket();
// Set timeout allowing backward compatibility to timeout if no connectTimeoutMS is set
this.connection.setTimeout(this.socketOptions.connectTimeoutMS != null ? this.socketOptions.connectTimeoutMS : this.socketOptions.timeout);
// Work around for 0.4.X

@@ -63,5 +63,5 @@ if(process.version.indexOf("v0.4") == -1) this.connection.setNoDelay(this.socketOptions.noDelay);

this.connection.setKeepAlive(false);
}
}
}
// Set up pair for tls with server, accept self-signed certificates as well

@@ -72,3 +72,3 @@ var pair = this.pair = tls.createSecurePair(false);

this.connection.pipe(this.pair.encrypted);
// Setup clearText stream

@@ -92,3 +92,3 @@ this.writeSteam = this.pair.cleartext;

// Set options on the socket
this.connection.setTimeout(this.socketOptions.timeout);
this.connection.setTimeout(this.socketOptions.connectTimeoutMS != null ? this.socketOptions.connectTimeoutMS : this.socketOptions.timeout);
// Work around for 0.4.X

@@ -102,3 +102,3 @@ if(process.version.indexOf("v0.4") == -1) this.connection.setNoDelay(this.socketOptions.noDelay);

this.connection.setKeepAlive(false);
}
}
}

@@ -117,3 +117,3 @@

this.connection.on("close", closeHandler(this));
}
}
}

@@ -133,3 +133,3 @@

var binaryCommand = command[i].toBinary()
if(binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes"));
if(!this.socketOptions['disableDriverBSONSizeCheck'] && binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes"));
if(this.logger != null && this.logger.doDebug) this.logger.debug("writing command to mongodb", binaryCommand);

@@ -140,8 +140,8 @@ var r = this.writeSteam.write(binaryCommand);

var binaryCommand = command.toBinary()
if(binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes"));
if(!this.socketOptions['disableDriverBSONSizeCheck'] && binaryCommand.length > this.maxBsonSize) return callback(new Error("Document exceeds maximal allowed bson size of " + this.maxBsonSize + " bytes"));
if(this.logger != null && this.logger.doDebug) this.logger.debug("writing command to mongodb", binaryCommand);
var r = this.writeSteam.write(binaryCommand);
}
} catch (err) {
if(typeof callback === 'function') callback(err);
}
} catch (err) {
if(typeof callback === 'function') callback(err);
}

@@ -151,3 +151,3 @@ }

// Force the closure of the connection
Connection.prototype.close = function() {
Connection.prototype.close = function() {
// clear out all the listeners

@@ -162,12 +162,12 @@ resetHandlers(this, true);

// Reset all handlers
var resetHandlers = function(self, clearListeners) {
var resetHandlers = function(self, clearListeners) {
self.eventHandlers = {error:[], connect:[], close:[], end:[], timeout:[], parseError:[], message:[]};
// If we want to clear all the listeners
if(clearListeners && self.connection != null) {
var keys = Object.keys(self.eventHandlers);
var keys = Object.keys(self.eventHandlers);
// Remove all listeners
for(var i = 0; i < keys.length; i++) {
self.connection.removeAllListeners(keys[i]);
}
}
}

@@ -182,5 +182,7 @@ }

var connectHandler = function(self) {
return function() {
return function() {
// Set connected
self.connected = true;
// Now that we are connected set the socket timeout
self.connection.setTimeout(self.socketOptions.socketTimeoutMS != null ? self.socketOptions.socketTimeoutMS : self.socketOptions.timeout);
// Emit the connect event with no error

@@ -215,3 +217,3 @@ self.emit("connect", null, self);

data = data.slice(remainingBytesToRead);
// Emit current complete message

@@ -226,6 +228,6 @@ try {

// Emit the buffer
self.emit("message", emitBuffer, self);
self.emit("message", emitBuffer, self);
} catch(err) {
var errorObject = {err:"socketHandler", trace:err, bin:buffer, parseState:{
sizeOfMessage:self.sizeOfMessage,
sizeOfMessage:self.sizeOfMessage,
bytesRead:self.bytesRead,

@@ -241,6 +243,6 @@ stubBuffer:self.stubBuffer}};

// size of the message (< 4 bytes)
if(self.stubBuffer != null && self.stubBuffer.length > 0) {
if(self.stubBuffer != null && self.stubBuffer.length > 0) {
// If we have enough bytes to determine the message size let's do it
if(self.stubBuffer.length + data.length > 4) {
if(self.stubBuffer.length + data.length > 4) {
// Prepad the data

@@ -277,5 +279,5 @@ var newData = new Buffer(self.stubBuffer.length + data.length);

var errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
sizeOfMessage:sizeOfMessage,
bytesRead:self.bytesRead,
stubBuffer:self.stubBuffer}};
sizeOfMessage: sizeOfMessage,
bytesRead: self.bytesRead,
stubBuffer: self.stubBuffer}};
if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject);

@@ -300,3 +302,3 @@ // We got a parse Error fire it off then keep going

data = new Buffer(0);
} else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage == data.length) {

@@ -313,6 +315,6 @@ try {

// Emit the message
self.emit("message", emitBuffer, self);
self.emit("message", emitBuffer, self);
} catch (err) {
var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
sizeOfMessage:self.sizeOfMessage,
sizeOfMessage:self.sizeOfMessage,
bytesRead:self.bytesRead,

@@ -323,8 +325,8 @@ stubBuffer:self.stubBuffer}};

self.emit("parseError", errorObject, self);
}
}
} else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonSize) {
var errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{
sizeOfMessage:sizeOfMessage,
sizeOfMessage:sizeOfMessage,
bytesRead:0,
buffer:null,
buffer:null,
stubBuffer:null}};

@@ -335,3 +337,3 @@ if(self.logger != null && self.logger.doError) self.logger.error("parseError", errorObject);

// Clear out the state of the parser
// Clear out the state of the parser
self.buffer = null;

@@ -353,3 +355,3 @@ self.sizeOfMessage = 0;

// Copy rest of message
data = data.slice(sizeOfMessage);
data = data.slice(sizeOfMessage);
// Emit the message

@@ -359,3 +361,3 @@ self.emit("message", emitBuffer, self);

var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
sizeOfMessage:sizeOfMessage,
sizeOfMessage:sizeOfMessage,
bytesRead:self.bytesRead,

@@ -368,3 +370,3 @@ stubBuffer:self.stubBuffer}};

}
}
} else {

@@ -379,3 +381,3 @@ // Create a buffer that contains the space for the non-complete message

}
}
}
}

@@ -390,7 +392,7 @@ }

// Emit end event
self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
}
}
var timeoutHandler = function(self) {
var timeoutHandler = function(self) {
return function() {

@@ -406,3 +408,3 @@ self.emit("timeout", {err: 'connection to [' + self.socketOptions.host + ':' + self.socketOptions.port + '] timed out'}, self);

var errorHandler = function(self) {
var errorHandler = function(self) {
return function(err) {

@@ -417,5 +419,5 @@ // Set connected to false

var closeHandler = function(self) {
return function(hadError) {
return function(hadError) {
// If we have an error during the connection phase
if(hadError && !self.connected) {
if(hadError && !self.connected) {
// Set disconnected

@@ -429,3 +431,3 @@ self.connected = false;

// Emit close
self.emit("close", {err: 'connection closed to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
self.emit("close", {err: 'connection closed to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
}

@@ -432,0 +434,0 @@ }

var Connection = require('./connection').Connection,
ReadPreference = require('./read_preference').ReadPreference,
DbCommand = require('../commands/db_command').DbCommand,

@@ -8,3 +9,3 @@ MongoReply = require('../responses/mongo_reply').MongoReply,

inspect = require('util').inspect,
Server = require('./server').Server,
Server = require('./server').Server,
PingStrategy = require('./strategies/ping_strategy').PingStrategy,

@@ -28,16 +29,19 @@ StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy;

* Options
* - **ha** {Boolean, default:false}, turn on high availability.
* - **ha** {Boolean, default:true}, turn on high availability.
* - **haInterval** {Number, default:2000}, time between each replicaset status check.
* - **reconnectWait** {Number, default:1000}, time to wait in miliseconds before attempting reconnect.
* - **retries** {Number, default:30}, number of times to attempt a replicaset reconnect.
* - **rs_name** {String}, the name of the replicaset to connect to.
* - **readPreference** {String}, the prefered read preference (Server.READ_PRIMARY, Server.READ_SECONDARY, Server.READ_SECONDARY_ONLY).
* - **read_secondary** {Boolean, deprecated}, allow reads from secondary.
* - **rs_name** {String}, the name of the replicaset to connect to.
* - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), connectTimeoutMS:(number), socketTimeoutMS:(number))
* - **readPreference** {String}, the prefered read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* - **strategy** {String, default:null}, selection strategy for reads choose between (ping and statistical, default is round-robin)
* - **secondaryAcceptableLatencyMS** {Number, default:15}, sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms)
*
* @class Represents a Replicaset Configuration
* @param {Array} list of server objects participating in the replicaset.
* @param {Object} [options] additional options for the collection.
* @param {Array} list of server objects participating in the replicaset.
* @param {Object} [options] additional options for the replicaset connection.
*/
var ReplSet = exports.ReplSet = function(servers, options) {
this.count = 0;
// Set up basic

@@ -50,2 +54,8 @@ if(!(this instanceof ReplSet))

// Ensure no Mongos's
for(var i = 0; i < servers.length; i++) {
if(!(servers[i] instanceof Server)) throw new Error("list of servers must be of type Server");
}
// Just reference for simplicity
var self = this;

@@ -62,6 +72,6 @@ // Contains the master server entry

this.closedConnectionCount = 0;
this._used = false;
this._used = false;
// Default poolSize for new server instances
this.poolSize = this.options.poolSize == null ? 1 : this.options.poolSize;
this.poolSize = this.options.poolSize == null ? 5 : this.options.poolSize;
this._currentServerChoice = 0;

@@ -82,27 +92,28 @@

this.recordQueryStats = false;
// Get the readPreference
var readPreference = this.options['readPreference'];
// Read preference setting
var readPreference = this.options['readPreference'];
// Validate correctness of Read preferences
if(readPreference != null) {
if(readPreference != Server.READ_PRIMARY && readPreference != Server.READ_SECONDARY_ONLY
&& readPreference != Server.READ_SECONDARY) {
throw new Error("Illegal readPreference mode specified, " + readPreference);
if(readPreference != ReadPreference.PRIMARY && readPreference != ReadPreference.PRIMARY_PREFERRED
&& readPreference != ReadPreference.SECONDARY && readPreference != ReadPreference.SECONDARY_PREFERRED
&& readPreference != ReadPreference.NEAREST && typeof readPreference != 'object' && readPreference['_type'] != 'ReadPreference') {
throw new Error("Illegal readPreference mode specified, " + readPreference);
}
// Set read Preference
this._readPreference = readPreference;
} else {
this._readPreference = null;
this._readPreference = null;
}
// Strategy for picking a secondary
// this.strategy = this.options['strategy'] == null ? 'statistical' : this.options['strategy'];
this.secondaryAcceptableLatencyMS = this.options['secondaryAcceptableLatencyMS'] == null ? 15 : this.options['secondaryAcceptableLatencyMS'];
this.strategy = this.options['strategy'];
// Make sure strategy is one of the two allowed
if(this.strategy != null && (this.strategy != 'ping' && this.strategy != 'statistical')) throw new Error("Only ping or statistical strategies allowed");
if(this.strategy != null && (this.strategy != 'ping' && this.strategy != 'statistical')) throw new Error("Only ping or statistical strategies allowed");
// Let's set up our strategy object for picking secodaries
if(this.strategy == 'ping') {
// Create a new instance
this.strategyInstance = new PingStrategy(this);
this.strategyInstance = new PingStrategy(this, this.secondaryAcceptableLatencyMS);
} else if(this.strategy == 'statistical') {

@@ -113,14 +124,14 @@ // Set strategy as statistical

this.enableRecordQueryStats(true);
}
}
// Set default connection pool options
this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {};
this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {};
// Set up logger if any set
this.logger = this.options.logger != null
&& (typeof this.options.logger.debug == 'function')
&& (typeof this.options.logger.error == 'function')
&& (typeof this.options.logger.debug == 'function')
this.logger = this.options.logger != null
&& (typeof this.options.logger.debug == 'function')
&& (typeof this.options.logger.error == 'function')
&& (typeof this.options.logger.debug == 'function')
? this.options.logger : {error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}};
// Ensure all the instances are of type server and auto_reconnect is false

@@ -142,9 +153,16 @@ if(!Array.isArray(servers) || servers.length == 0) {

}
}
}
// Enabled ha
this.haEnabled = this.options['ha'] == null ? false : this.options['ha'];
this.haEnabled = this.options['ha'] == null ? true : this.options['ha'];
// How often are we checking for new servers in the replicaset
this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 2000 : this.options['haInterval'];
this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 1000 : this.options['haInterval'];
this._replicasetTimeoutId = null;
// Connection timeout
this._connectTimeoutMS = 1000;
// Current list of servers to test
this.pingCandidateServers = [];
// Last replicaset check time
this.lastReplicaSetTime = new Date().getTime();
};

@@ -165,3 +183,3 @@

// Ensure slaveOk is correct for secodnaries read preference and tags
if((this._readPreference == Server.READ_SECONDARY || this._readPreference == Server.READ_SECONDARY_ONLY)
if((this._readPreference == ReadPreference.SECONDARY_PREFERRED || this._readPreference == ReadPreference.SECONDARY)
|| (this._readPreference != null && typeof this._readPreference == 'object')) {

@@ -173,6 +191,6 @@ this.slaveOk = true;

/**
* Return the used state
* @ignore
*/
// Return the used state
ReplSet.prototype._isUsed = function() {
ReplSet.prototype._isUsed = function() {
return this._used;

@@ -184,5 +202,5 @@ }

*/
ReplSet.prototype.setTarget = function(target) {
this.target = target;
};
ReplSet.prototype.isMongos = function() {
return false;
}

@@ -200,3 +218,3 @@ /**

*/
Server.prototype.isSetMember = function() {
ReplSet.prototype.isSetMember = function() {
return false;

@@ -209,3 +227,3 @@ }

ReplSet.prototype.isPrimary = function(config) {
return this.readSecondary && this.secondaries.length > 0 ? false : true;
return this.readSecondary && Object.keys(this._state.secondaries).length > 0 ? false : true;
}

@@ -220,52 +238,12 @@

* @ignore
*/
// Clean up dead connections
var cleanupConnections = ReplSet.cleanupConnections = function(connections, addresses, byTags) {
// Ensure we don't have entries in our set with dead connections
var keys = Object.keys(connections);
for(var i = 0; i < keys.length; i++) {
var server = connections[keys[i]];
// If it's not connected remove it from the list
if(!server.isConnected()) {
// Remove from connections and addresses
delete connections[keys[i]];
delete addresses[keys[i]];
// Clean up tags if needed
if(server.tags != null && typeof server.tags === 'object') {
cleanupTags(server, byTags);
}
}
}
}
/**
* @ignore
*/
var cleanupTags = ReplSet._cleanupTags = function(server, byTags) {
var serverTagKeys = Object.keys(server.tags);
// Iterate over all server tags and remove any instances for that tag that matches the current
// server
for(var i = 0; i < serverTagKeys.length; i++) {
// Fetch the value for the tag key
var value = server.tags[serverTagKeys[i]];
// If we got an instance of the server
if(byTags[serverTagKeys[i]] != null
&& byTags[serverTagKeys[i]][value] != null
&& Array.isArray(byTags[serverTagKeys[i]][value])) {
// List of clean servers
var cleanInstances = [];
// We got instances for the particular tag set
var instances = byTags[serverTagKeys[i]][value];
for(var j = 0, jlen = instances.length; j < jlen; j++) {
var serverInstance = instances[j];
// If we did not find an instance add it to the clean instances
if((serverInstance.host + ":" + serverInstance.port) !== (server.host + ":" + server.port)) {
cleanInstances.push(serverInstance);
}
}
// Update the byTags list
byTags[serverTagKeys[i]][value] = cleanInstances;
}
* @private
**/
ReplSet.prototype._checkReplicaSet = function() {
if(!this.haEnabled) return false;
var currentTime = new Date().getTime();
if((currentTime - this.lastReplicaSetTime) >= this.replicasetStatusCheckInterval) {
this.lastReplicaSetTime = currentTime;
return true;
} else {
return false;
}

@@ -310,3 +288,2 @@ }

*/
// Ensure no callback is left hanging when we have an error
var __executeAllCallbacksWithError = function(dbInstance, error) {

@@ -325,7 +302,112 @@ var keys = Object.keys(dbInstance._callBackStore._notReplied);

* @ignore
* @private
*/
ReplSet.prototype._validateReplicaset = function(result, auths) {
var self = this;
// For each member we need to check if we have a new connection that needs to be established
var members = result['documents'][0]['members'];
// Get members
var members = Array.isArray(result['documents'][0]['members']) ? result['documents'][0]['members'] : [];
// The total members we check
var serversToConnectList = {};
// Iterate over all the members and see if we need to reconnect
for(var i = 0, jlen = members.length; i < jlen; i++) {
var member = members[i];
if(member['health'] != 0
&& null == self._state['addresses'][member['name']]
&& null == serversToConnectList[member['name']]) {
// Split the server string
var parts = member.name.split(/:/);
if(parts.length == 1) {
parts = [parts[0], Connection.DEFAULT_PORT];
}
// Default empty socket options object
var socketOptions = {host:parts[0], port:parseInt(parts[1], 10)};
// If a socket option object exists clone it
if(self.socketOptions != null) {
var keys = Object.keys(self.socketOptions);
for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = self.socketOptions[keys[i]];
}
// Create a new server instance
var newServer = new Server(parts[0], parseInt(parts[1], 10), {auto_reconnect:false, 'socketOptions':socketOptions
, logger:self.logger, ssl:self.ssl, poolSize:self.poolSize});
// Set the replicaset instance
newServer.replicasetInstance = self;
// Add handlers
newServer.on("close", self.closeHandler);
newServer.on("timeout", self.timeoutHandler);
newServer.on("error", self.errorHandler);
// Add to list of server connection target
serversToConnectList[member['name']] = newServer;
} else if(member['stateStr'] == 'PRIMARY' && self._state.master['name'] != member['name']) {
// Delete master record so we can rediscover it
delete self._state['addresses'][self._state.master['name']];
// Update inormation on new primary
var newMaster = self._state.addresses[member['name']];
newMaster.isMasterDoc.ismaster = true;
newMaster.isMasterDoc.secondary = false;
self._state.master = newMaster;
// Remove from secondaries
delete self._state.secondaries[member['name']];
newMaster = null;
}
}
// All servers we want to connect to
var serverKeys = Object.keys(serversToConnectList);
// For all remaining servers on the list connect
while(serverKeys.length > 0) {
var _serverKey = serverKeys.pop();
// Fetch the server
var _server = serversToConnectList[_serverKey];
// Add a new server to the total number of servers that need to initialized before we are done
var newServerCallback = self.connectionHandler(_server);
// Connect To the new server
_server.connect(self.db, {returnIsMasterResults: true, eventReceiver:newServer}, function(err, result, _server) {
if(err == null && result != null) {
// Fetch the myState
var document = result.documents[0];
// Remove from list until
if(document.ismaster || document.secondary || document.arbiterOnly) {
process.nextTick(function() {
// Apply any auths
if(Array.isArray(auths) && auths.length > 0) {
// Get number of auths we need to execute
var numberOfAuths = auths.length;
// Apply all auths
for(var i = 0; i < auths.length; i++) {
self.db.authenticate(auths[i].username, auths[i].password, {'authdb':auths[i].authdb}, function(err, authenticated) {
numberOfAuths = numberOfAuths - 1;
// If we have no more authentications to replay
if(numberOfAuths == 0) {
newServerCallback(err, result, _server);
}
});
}
} else {
newServerCallback(err, result, _server);
}
});
} else {
_server.close();
}
} else {
_server.close();
}
});
}
}
/**
* @ignore
*/
ReplSet.prototype.connect = function(parent, options, callback) {
var self = this;
var dateStamp = new Date().getTime();
if('function' === typeof options) callback = options, options = {};
if('function' === typeof options) callback = options, options = {};
if(options == null) options = {};

@@ -347,8 +429,13 @@ if(!('function' === typeof callback)) callback = null;

// Clean up state
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]};
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}
, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[], 'members':[]};
// Create a connection handler
// self.connectionHandler = null != self.connectionHandler ? self.connectionHandler : function(instanceServer) {
self.connectionHandler = function(instanceServer) {
return function(err, result) {
self.count = self.count + 1;
// If we found a master call it at the end
var masterCallback = null;
// Remove a server from the list of intialized servers we need to perform

@@ -360,6 +447,6 @@ self._numberOfServersLeftToInitialize = self._numberOfServersLeftToInitialize - 1;

}
// Add enable query information
instanceServer.enableRecordQueryStats(replSetSelf.recordQueryStats);
if(err == null && result.documents[0].hosts != null) {

@@ -379,9 +466,9 @@ // Fetch the isMaster command result

var primary = document.primary;
// Ensure we are keying on the same name for lookups as mongodb might return
// dns name and the driver is using ip's
// Ensure we are keying on the same name for lookups as mongodb might return
// dns name and the driver is using ip's
// Rename the connection so we are keying on the name used by mongod
var userProvidedServerString = instanceServer.host + ":" + instanceServer.port;
var userProvidedServerString = instanceServer.host + ":" + instanceServer.port;
var me = document.me || userProvidedServerString;
// If we have user provided entries already, switch them to avoid additional

@@ -404,12 +491,12 @@ // open connections

replSetSelf._state['arbiters'][me] = server;
}
}
// Set name of the server
server.name = me;
// Add the existing one to the replicaset list of addresses
replSetSelf._state['addresses'][me] = server;
replSetSelf._state['addresses'][me] = server;
} else {
instanceServer.name = me;
}
// Only add server to our internal list if it's a master, secondary or arbiter

@@ -427,2 +514,5 @@ if(isMaster == true || secondary == true || arbiterOnly == true) {

__executeAllCallbacksWithError(parent, err);
// Set the parent
if(typeof parent.openCalled != 'undefined')
parent.openCalled = false;
// Ensure single callback only

@@ -434,3 +524,3 @@ if(callback != null) {

// Return the error
internalCallback(err, null);
internalCallback(err, null, replSetSelf);
} else {

@@ -444,10 +534,9 @@ // If the parent has listeners trigger an event

}
}
}
// Check if this is the primary server, then disconnect otherwise keep going
if(replSetSelf._state.master != null) {
var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
// var errorServerAddress = server.host + ":" + server.port;
var errorServerAddress = server.name;
// Only shut down the set if we have a primary server error

@@ -466,8 +555,8 @@ if(primaryAddress == errorServerAddress) {

delete replSetSelf._state.passives[errorServerAddress];
}
}
// Check if we are reading from Secondary only
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
if(replSetSelf._readPreference == ReadPreference.SECONDARY && Object.keys(replSetSelf._state.secondaries).length == 0) {
closeServers();
}
}
}

@@ -489,2 +578,5 @@ } else {

__executeAllCallbacksWithError(parent, err);
// Set the parent
if(typeof parent.openCalled != 'undefined')
parent.openCalled = false;
// Ensure single callback only

@@ -496,3 +588,3 @@ if(callback != null) {

// Return the error
internalCallback(new Error("connection timed out"), null);
internalCallback(new Error("connection timed out"), null, replSetSelf);
} else {

@@ -506,4 +598,4 @@ // If the parent has listeners trigger an event

}
}
}
// Check if this is the primary server, then disconnect otherwise keep going

@@ -513,3 +605,3 @@ if(replSetSelf._state.master != null) {

var errorServerAddress = server.name;
// Only shut down the set if we have a primary server error

@@ -528,8 +620,8 @@ if(primaryAddress == errorServerAddress) {

delete replSetSelf._state.passives[errorServerAddress];
}
}
// Check if we are reading from Secondary only
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
if(replSetSelf._readPreference == ReadPreference.SECONDARY && Object.keys(replSetSelf._state.secondaries).length == 0) {
closeServers();
}
}
}

@@ -544,3 +636,3 @@ } else {

var closeServers = function() {
// Set the state to disconnected
// Set the state to disconnected
parent._state = 'disconnected';

@@ -552,2 +644,5 @@ // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply

__executeAllCallbacksWithError(parent, err);
// Set the parent
if(typeof parent.openCalled != 'undefined')
parent.openCalled = false;
// Ensure single callback only

@@ -559,3 +654,3 @@ if(callback != null) {

// Return the error
internalCallback(err, null);
internalCallback(err, null, replSetSelf);
} else {

@@ -569,4 +664,4 @@ // If the parent has listeners trigger an event

}
}
}
// Check if this is the primary server, then disconnect otherwise keep going

@@ -576,4 +671,2 @@ if(replSetSelf._state.master != null) {

var errorServerAddress = server.name;
// var errorServerAddress = server.host + ":" + server.port;
// Only shut down the set if we have a primary server error

@@ -592,8 +685,8 @@ if(primaryAddress == errorServerAddress) {

delete replSetSelf._state.passives[errorServerAddress];
}
}
// Check if we are reading from Secondary only
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
if(replSetSelf._readPreference == ReadPreference.SECONDARY && Object.keys(replSetSelf._state.secondaries).length == 0) {
closeServers();
}
}
}

@@ -604,3 +697,3 @@ } else {

}
// Ensure we don't have duplicate handlers

@@ -620,29 +713,2 @@ instanceServer.removeAllListeners("close");

// For each tag in tags let's add the instance Server to the list for that tag
if(tags != null && typeof tags === 'object') {
var tagKeys = Object.keys(tags);
// For each tag file in the server add it to byTags
for(var i = 0; i < tagKeys.length; i++) {
var value = tags[tagKeys[i]];
// Check if we have a top level tag object
if(replSetSelf._state.byTags[tagKeys[i]] == null) replSetSelf._state.byTags[tagKeys[i]] = {};
// For the value check if we have an array of server instances
if(!Array.isArray(replSetSelf._state.byTags[tagKeys[i]][value])) replSetSelf._state.byTags[tagKeys[i]][value] = [];
// Check that the instance is not already registered there
var valueArray = replSetSelf._state.byTags[tagKeys[i]][value];
var found = false;
// Iterate over all values
for(var j = 0; j < valueArray.length; j++) {
if(valueArray[j].host == instanceServer.host && valueArray[j].port == instanceServer.port) {
found = true;
break;
}
}
// If it was not found push the instance server to the list
if(!found) valueArray.push(instanceServer);
}
}
// Remove from error list

@@ -656,3 +722,3 @@ delete replSetSelf._state.errors[me];

if(replSetSelf.replicaSet == null) {
replSetSelf._state.setName = setName;
replSetSelf._state.setName = setName;
} else if(replSetSelf.replicaSet != setName && replSetSelf._serverState != 'disconnected') {

@@ -666,5 +732,5 @@ replSetSelf._state.errorMessages.push(new Error("configured mongodb replicaset does not match provided replicaset [" + setName + "] != [" + replSetSelf.replicaSet + "]"));

// Return error message ignoring rest of calls
return internalCallback(replSetSelf._state.errorMessages[0], parent);
return internalCallback(replSetSelf._state.errorMessages[0], parent, replSetSelf);
}
// Let's add the server to our list of server types

@@ -679,8 +745,12 @@ if(secondary == true && (passive == false || passive == null)) {

replSetSelf._state.master = instanceServer;
masterCallback = callback;
callback = null;
} else if(isMaster == false && primary != null && replSetSelf._state.addresses[primary]) {
replSetSelf._state.master = replSetSelf._state.addresses[primary];
masterCallback = callback;
callback = null;
}
// Let's go throught all the "possible" servers in the replicaset
var candidateServers = hosts.concat(arbiters).concat(passives);
var candidateServers = hosts.concat(arbiters).concat(passives);

@@ -690,6 +760,6 @@ // If we have new servers let's add them

// Fetch the server string
var candidateServerString = candidateServers[i];
var candidateServerString = candidateServers[i];
// Add the server if it's not defined and not already errored out
if(null == replSetSelf._state.addresses[candidateServerString]
&& null == replSetSelf._state.errors[candidateServerString]) {
&& null == replSetSelf._state.errors[candidateServerString]) {
// Split the server string

@@ -711,9 +781,11 @@ var parts = candidateServerString.split(/:/);

socketOptions['host'] = parts[0];
socketOptions['port'] = parseInt(parts[1]);
socketOptions['port'] = parseInt(parts[1], 10);
// Set fast connect timeout
socketOptions['connectTimeoutMS'] = replSetSelf._connectTimeoutMS
// Create a new server instance
var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions
var newServer = new Server(parts[0], parseInt(parts[1], 10), {auto_reconnect:false, 'socketOptions':socketOptions
, logger:replSetSelf.logger, ssl:replSetSelf.ssl, poolSize:replSetSelf.poolSize});
// Set the replicaset instance
newServer.replicasetInstance = replSetSelf;
newServer.replicasetInstance = replSetSelf;

@@ -724,3 +796,3 @@ // Add handlers

newServer.on("error", replSetSelf.errorHandler);
// Add server to list, ensuring we don't get a cascade of request to the same server

@@ -735,3 +807,3 @@ replSetSelf._state.addresses[candidateServerString] = newServer;

}
}
}
} else {

@@ -742,5 +814,23 @@ // Remove the instance from out list of servers

} else {
instanceServer.close();
delete replSetSelf._state.addresses[instanceServer.host + ":" + instanceServer.port];
}
// Check if we are ready in the next tick to allow more connections to be done
// process.nextTick(function() {
// Call back as we have a master letting the rest of the connections happen async
if(masterCallback != null) {
var internalCallback = masterCallback;
masterCallback = null;
// Fire open event
process.nextTick(function() {
// Emit the open event
parent.emit("open", null, parent);
});
internalCallback(null, parent, replSetSelf);
}
// });
// If done finish up

@@ -762,8 +852,15 @@ if((self._numberOfServersLeftToInitialize == 0) && replSetSelf._serverState === 'connecting' && replSetSelf._state.errorMessages.length == 0) {

callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
// Fire open event
process.nextTick(function() {
// Emit on db parent
parent.emit("fullsetup", null, parent);
// Emit all servers done
replSetSelf.emit("fullsetup", null, parent);
});
// Callback
if(typeof internalCallback == 'function') {
internalCallback(null, parent, replSetSelf);
}
// Perform callback
internalCallback(null, parent);
})

@@ -774,8 +871,14 @@ } else {

callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
// Fire open event
process.nextTick(function() {
parent.emit("fullsetup", null, parent);
// Emit all servers done
replSetSelf.emit("fullsetup", null, parent);
});
// Callback
if(typeof internalCallback == 'function') {
internalCallback(null, parent, replSetSelf);
}
// Perform callback
internalCallback(null, parent);
}

@@ -792,8 +895,14 @@ } else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length > 0) {

callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
// Fire open event
process.nextTick(function() {
parent.emit("fullsetup", null, parent);
// Emit all servers done
replSetSelf.emit("fullsetup", null, parent)
});
// Callback
if(typeof internalCallback == 'function') {
internalCallback(null, parent, replSetSelf);
}
// Perform callback
internalCallback(null, parent);
})

@@ -804,10 +913,16 @@ } else {

callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
// Fire open event
process.nextTick(function() {
parent.emit("fullsetup", null, parent);
// Emit all servers done
replSetSelf.emit("fullsetup", null, parent);
});
// Callback
if(typeof internalCallback == 'function') {
internalCallback(null, parent, replSetSelf);
}
// Perform callback
internalCallback(null, parent);
}
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) {
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) {
replSetSelf._serverState = 'disconnected';

@@ -820,3 +935,3 @@ // ensure no callbacks get called twice

// Perform callback
internalCallback(new Error("no secondary server found"), null);
internalCallback(new Error("no secondary server found"), null, replSetSelf);
} else if(typeof callback === 'function') {

@@ -830,4 +945,4 @@ replSetSelf._serverState = 'disconnected';

// Perform callback
internalCallback(new Error("no primary server found"), null);
}
internalCallback(new Error("no primary server found"), null, replSetSelf);
}
} else if((self._numberOfServersLeftToInitialize == 0) && replSetSelf._state.errorMessages.length > 0 && replSetSelf._serverState != 'disconnected') {

@@ -842,7 +957,7 @@ // Set done

// Callback to signal we are done
internalCallback(replSetSelf._state.errorMessages[0], null);
internalCallback(replSetSelf._state.errorMessages[0], null, replSetSelf);
}
}
}
// Ensure we have all registered servers in our set

@@ -854,3 +969,3 @@ for(var i = 0; i < serverConnections.length; i++) {

// Initialize all the connections
for(var i = 0; i < serverConnections.length; i++) {
for(var i = 0; i < serverConnections.length; i++) {
// Set up the logger for the server connection

@@ -872,2 +987,4 @@ serverConnections[i].logger = replSetSelf.logger;

socketOptions['port'] = serverConnections[i].port;
// Set fast connect timeout
socketOptions['connectTimeoutMS'] = replSetSelf._connectTimeoutMS

@@ -880,91 +997,3 @@ // Set the socket options

serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, self.connectionHandler(serverConnections[i]));
}
// The checking function
this.replicasetCheckFunction = function() {
try {
// Retrieve a reader connection
var con = self.checkoutReader();
// If we have a connection and we have a db object
if(con != null && Array.isArray(self.dbInstances) && self.dbInstances.length > 0) {
var dbInstance = self.dbInstances[0];
dbInstance.admin().command({replSetGetStatus:1}, {connection:con}, function(err, result) {
// Paranoid android
if(null == err && null != result && null != result["documents"] && result["documents"].length > 0) {
// For each member we need to check if we have a new connection that needs to be established
var members = result['documents'][0]['members'];
if(null != members) {
// The total members we check
var newServers = 0;
// Iterate over all existing members
for(var i = 0, jlen = members.length; i < jlen; i++) {
// Get a member
var member = members[i];
// If the node is healthy and it does not exist in the current replicaset, add it to the
// current setup
if(null != self._state && 0 != member['health'] && null == self._state['addresses'][member['name']]) {
// We need to add a server to the connection, this means going through the notions of establishing
// A completely new connection
// Found a new server
newServers = newServers + 1;
// Split the server string
var parts = member.name.split(/:/);
if(parts.length == 1) {
parts = [parts[0], Connection.DEFAULT_PORT];
}
// Default empty socket options object
var socketOptions = {};
// If a socket option object exists clone it
if(self.socketOptions != null) {
var keys = Object.keys(self.socketOptions);
for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = self.socketOptions[keys[i]];
}
// Add host information to socket options
socketOptions['host'] = parts[0];
socketOptions['port'] = parseInt(parts[1]);
// Create a new server instance
var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions
, logger:self.logger, ssl:self.ssl, poolSize:self.poolSize});
// Set the replicaset instance
newServer.replicasetInstance = self;
// Add handlers
newServer.on("close", self.closeHandler);
newServer.on("timeout", self.timeoutHandler);
newServer.on("error", self.errorHandler);
// Add a new server to the total number of servers that need to initialized before we are done
var newServerCallback = self.connectionHandler(newServer);
// Let's set up a new server instance
newServer.connect(self.db, {returnIsMasterResults: true, eventReceiver:newServer}, function(err, result) {
// Remove from number of newServers
newServers = newServers - 1;
// Call the setup
newServerCallback(err, result);
// If we have 0 new servers let's go back to rechecking
if(newServers <= 0) {
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
}
});
}
}
// If we have no new servers check status again
if(newServers == 0) {
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
}
}
}
});
}
} catch(err) {
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
}
};
}
}

@@ -977,3 +1006,3 @@

// Establish connection
var connection = this._state.master != null ? this._state.master.checkoutWriter() : null;
var connection = this._state.master != null ? this._state.master.checkoutWriter() : null;
// Return the connection

@@ -986,10 +1015,119 @@ return connection;

*/
ReplSet.prototype.checkoutReader = function() {
var pickFirstConnectedSecondary = function pickFirstConnectedSecondary(self, tags) {
var keys = Object.keys(self._state.secondaries);
var connection = null;
// Find first available reader if any
for(var i = 0; i < keys.length; i++) {
connection = self._state.secondaries[keys[i]].checkoutReader();
if(connection != null) break;
}
// If we still have a null, read from primary if it's not secondary only
if(self._readPreference == ReadPreference.SECONDARY_PREFERRED) {
connection = self._state.master.checkoutReader();
}
if(connection == null) {
var preferenceName = self._readPreference == ReadPreference.SECONDARY_PREFERRED ? 'secondary' : self._readPreference;
return new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
}
// Return the connection
return connection;
}
/**
* @ignore
*/
var _pickFromTags = function(self, tags) {
// If we have an array or single tag selection
var tagObjects = Array.isArray(tags) ? tags : [tags];
// Iterate over all tags until we find a candidate server
for(var _i = 0; _i < tagObjects.length; _i++) {
// Grab a tag object
var tagObject = tagObjects[_i];
// Matching keys
var matchingKeys = Object.keys(tagObject);
// Match all the servers that match the provdided tags
var keys = Object.keys(self._state.secondaries);
var candidateServers = [];
for(var i = 0; i < keys.length; i++) {
var server = self._state.secondaries[keys[i]];
// If we have tags match
if(server.tags != null) {
var matching = true;
// Ensure we have all the values
for(var j = 0; j < matchingKeys.length; j++) {
if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) {
matching = false;
break;
}
}
// If we have a match add it to the list of matching servers
if(matching) {
candidateServers.push(server);
}
}
}
// If we have a candidate server return
if(candidateServers.length > 0) {
if(this.strategyInstance) return this.strategyInstance.checkoutSecondary(tags, candidateServers);
// Set instance to return
return candidateServers[Math.floor(Math.random() * candidateServers.length)].checkoutReader();
}
}
// No connection found
return null;
}
/**
* @ignore
*/
ReplSet.prototype.checkoutReader = function(readPreference, tags) {
var connection = null;
// If we have a read preference object unpack it
if(typeof readPreference == 'object' && readPreference['_type'] == 'ReadPreference') {
tags = readPreference.tags;
readPreference = readPreference.mode;
} else if(typeof readPreference == 'object' && readPreference['_type'] != 'ReadPreference') {
throw new Error("read preferences must be either a string or an instance of ReadPreference");
}
// Set up our read Preference, allowing us to override the readPreference
var finalReadPreference = readPreference != null ? readPreference : this._readPreference;
finalReadPreference = finalReadPreference == true ? ReadPreference.SECONDARY_PREFERRED : finalReadPreference;
// If we are reading from a primary
if(finalReadPreference == 'primary') {
// If we provide a tags set send an error
if(typeof tags == 'object' && tags != null) {
return new Error("PRIMARY cannot be combined with tags");
}
// If we provide a tags set send an error
if(this._state.master == null) {
return new Error("No replica set primary available for query with ReadPreference PRIMARY");
}
// Checkout a writer
return this.checkoutWriter();
}
// If we have specified to read from a secondary server grab a random one and read
// from it, otherwise just pass the primary connection
if((this.readSecondary == true || this._readPreference == Server.READ_SECONDARY || this._readPreference == Server.READ_SECONDARY_ONLY) && Object.keys(this._state.secondaries).length > 0) {
// Checkout a secondary server from the passed in set of servers
if(this.strategyInstance != null) {
connection = this.strategyInstance.checkoutSecondary();
if((this.readSecondary || finalReadPreference == ReadPreference.SECONDARY_PREFERRED || finalReadPreference == ReadPreference.SECONDARY) && Object.keys(this._state.secondaries).length > 0) {
// If we have tags, look for servers matching the specific tag
if(tags != null && typeof tags == 'object') {
// Get connection
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
// No candidate servers that match the tags, error
if(connection == null) {
return new Error("No replica set members available for query");
}
} else {

@@ -1000,35 +1138,70 @@ // Pick a random key

var key = keys[this._currentServerChoice++];
connection = this._state.secondaries[key].checkoutReader();
// Fetch a connectio
connection = this._state.secondaries[key] != null ? this._state.secondaries[key].checkoutReader() : null;
// If connection is null fallback to first available secondary
connection = connection == null ? pickFirstConnectedSecondary(this, tags) : connection;
}
} else if(this._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(this._state.secondaries).length == 0) {
connection = null;
} else if(this._readPreference != null && typeof this._readPreference === 'object') {
// Get all tag keys (used to try to find a server that is valid)
var keys = Object.keys(this._readPreference);
// final instance server
var instanceServer = null;
// for each key look for an avilable instance
for(var i = 0; i < keys.length; i++) {
// Grab subkey value
var value = this._readPreference[keys[i]];
// Check if we have any servers for the tag, if we do pick a random one
if(this._state.byTags[keys[i]] != null
&& this._state.byTags[keys[i]][value] != null
&& Array.isArray(this._state.byTags[keys[i]][value])
&& this._state.byTags[keys[i]][value].length > 0) {
// Let's grab an available server from the list using a random pick
var serverInstances = this._state.byTags[keys[i]][value];
// Set instance to return
instanceServer = serverInstances[Math.floor(Math.random() * serverInstances.length)];
break;
} else if(finalReadPreference == ReadPreference.PRIMARY_PREFERRED) {
// Check if there is a primary available and return that if possible
connection = this.checkoutWriter();
// If no connection available checkout a secondary
if(connection == null) {
// If we have tags, look for servers matching the specific tag
if(tags != null && typeof tags == 'object') {
// Get connection
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
// No candidate servers that match the tags, error
if(connection == null) {
return new Error("No replica set members available for query");
}
} else {
// Pick a random key
var keys = Object.keys(this._state.secondaries);
this._currentServerChoice = this._currentServerChoice % keys.length;
var key = keys[this._currentServerChoice++];
// Fetch a connectio
connection = this._state.secondaries[key] != null ? this._state.secondaries[key].checkoutReader() : null;
// If connection is null fallback to first available secondary
connection = connection == null ? pickFirstConnectedSecondary(this, tags) : connection;
}
}
// Return the instance of the server
connection = instanceServer != null ? instanceServer.checkoutReader() : this.checkoutWriter();
} else if(finalReadPreference == ReadPreference.SECONDARY_PREFERRED && tags == null && Object.keys(this._state.secondaries).length == 0) {
connection = this.checkoutWriter();
// If no connection return an error
if(connection == null) {
var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
connection = new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
}
} else if(finalReadPreference == ReadPreference.SECONDARY_PREFERRED) {
// If we have tags, look for servers matching the specific tag
if(tags != null && typeof tags == 'object') {
// Get connection
connection = _pickFromTags(this, tags);// = function(self, readPreference, tags) {
// No candidate servers that match the tags, error
if(connection == null) {
// No secondary server avilable, attemp to checkout a primary server
connection = this.checkoutWriter();
// If no connection return an error
if(connection == null) {
return new Error("No replica set members available for query");
}
}
} else if(this.strategyInstance != null) {
connection = this.strategyInstance.checkoutReader(tags);
}
} else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance != null) {
connection = this.strategyInstance.checkoutSecondary(tags);
} else if(finalReadPreference == ReadPreference.NEAREST && this.strategyInstance == null) {
return new Error("A strategy for calculating nearness must be enabled such as ping or statistical");
} else if(finalReadPreference == ReadPreference.SECONDARY && Object.keys(this._state.secondaries).length == 0) {
if(tags != null && typeof tags == 'object') {
var preferenceName = finalReadPreference == ReadPreference.SECONDARY ? 'secondary' : finalReadPreference;
connection = new Error("No replica set member available for query with ReadPreference " + preferenceName + " and tags " + JSON.stringify(tags));
} else {
connection = new Error("No replica set secondary available for query with ReadPreference SECONDARY");
}
} else {
connection = this.checkoutWriter();
}
// Return the connection

@@ -1048,3 +1221,3 @@ return connection;

allConnections = allConnections.concat(allMasterConnections);
// If we have read secondary let's add all secondary servers

@@ -1062,3 +1235,3 @@ if(this.readSecondary && Object.keys(this._state.secondaries).length > 0) {

}
// Return all the conections

@@ -1074,3 +1247,3 @@ return allConnections;

this.recordQueryStats = enable;
// Ensure all existing servers already have the flag set, even if the
// Ensure all existing servers already have the flag set, even if the
// connections are up already or we have not connected yet

@@ -1101,5 +1274,5 @@ if(this._state != null && this._state.addresses != null) {

ReplSet.prototype.close = function(callback) {
var self = this;
var self = this;
// Set server status as disconnected
this._serverState = 'disconnected';
this._serverState = 'disconnected';
// Get all the server instances and close them

@@ -1112,5 +1285,6 @@ var allServers = [];

allServers.push(this._state.addresses[keys[i]]);
}
}
}
// Let's process all the closing

@@ -1128,24 +1302,17 @@ var numberOfServersToClose = allServers.length;

var server = allServers[i];
if(server.isConnected()) {
// Close each server
server.close(function() {
numberOfServersToClose = numberOfServersToClose - 1;
// Clear out state if we are done
if(numberOfServersToClose == 0) {
// Clear out state
self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]};
}
// Close each server
server.close(function() {
numberOfServersToClose = numberOfServersToClose - 1;
// Clear out state if we are done
if(numberOfServersToClose == 0) {
// Clear out state
self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}
, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[], 'members':[]};
}
// If we are finished perform the call back
if(numberOfServersToClose == 0 && typeof callback === 'function') {
callback(null);
}
})
} else {
numberOfServersToClose = numberOfServersToClose - 1;
// If we have no more servers perform the callback
// If we are finished perform the call back
if(numberOfServersToClose == 0 && typeof callback === 'function') {
callback(null);
callback(null);
}
}
})
}

@@ -1171,5 +1338,5 @@ }

if(this._readPreference == null && this.readSecondary) {
return Server.READ_SECONDARY;
return ReadPreference.SECONDARY_PREFERRED;
} else if(this._readPreference == null && !this.readSecondary) {
return Server.READ_PRIMARY;
return ReadPreference.PRIMARY;
} else {

@@ -1179,3 +1346,3 @@ return this._readPreference;

}
});
});

@@ -1189,3 +1356,3 @@ /**

var servers = this.allServerInstances();
return servers[0].dbInstances;
return servers.length > 0 ? servers[0].dbInstances : [];
}

@@ -1229,3 +1396,3 @@ })

Object.defineProperty(ReplSet.prototype, "secondaries", {enumerable: true
, get: function() {
, get: function() {
var keys = Object.keys(this._state.secondaries);

@@ -1246,3 +1413,3 @@ var array = new Array(keys.length);

Object.defineProperty(ReplSet.prototype, "allSecondaries", {enumerable: true
, get: function() {
, get: function() {
return this.secondaries.concat(this.passives);

@@ -1292,3 +1459,3 @@ }

}
});
});

@@ -1295,0 +1462,0 @@ /**

var Connection = require('./connection').Connection,
ReadPreference = require('./read_preference').ReadPreference,
DbCommand = require('../commands/db_command').DbCommand,

@@ -12,9 +13,10 @@ MongoReply = require('../responses/mongo_reply').MongoReply,

* Options
* - **readPreference** {String, default:null}, set's the read preference (Server.READ_PRIMAR, Server.READ_SECONDARY_ONLY, Server.READ_SECONDARY)
* - **readPreference** {String, default:null}, set's the read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST)
* - **ssl** {Boolean, default:false}, use ssl connection (needs to have a mongod server with ssl support)
* - **slaveOk** {Boolean, default:false}, legacy option allowing reads from secondary, use **readPrefrence** instead.
* - **poolSize** {Number, default:1}, number of connections in the connection pool, set to 1 as default for legacy reasons.
* - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), timeout:(number))
* - **socketOptions** {Object, default:null}, an object containing socket options to use (noDelay:(boolean), keepAlive:(number), connectTimeoutMS:(number), socketTimeoutMS:(number))
* - **logger** {Object, default:null}, an object representing a logger that you want to use, needs to support functions debug, log, error **({error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}})**.
* - **auto_reconnect** {Boolean, default:false}, reconnect on error.
* - **disableDriverBSONSizeCheck** {Boolean, default:false}, force the server to error if the BSON message is to big
*

@@ -28,6 +30,6 @@ * @class Represents a Server connection.

// Set up event emitter
EventEmitter.call(this);
EventEmitter.call(this);
// Set up Server instance
if(!(this instanceof Server)) return new Server(host, port, options);
var self = this;

@@ -40,22 +42,23 @@ this.host = host;

this.connected = false;
this.poolSize = this.options.poolSize == null ? 1 : this.options.poolSize;
this.poolSize = this.options.poolSize == null ? 5 : this.options.poolSize;
this.disableDriverBSONSizeCheck = this.options.disableDriverBSONSizeCheck != null ? this.options.disableDriverBSONSizeCheck : false;
this.ssl = this.options.ssl == null ? false : this.options.ssl;
this.slaveOk = this.options["slave_ok"];
this._used = false;
// Get the readPreference
var readPreference = this.options['readPreference'];
var readPreference = this.options['readPreference'];
// Read preference setting
if(readPreference != null) {
if(readPreference != Server.READ_PRIMARY && readPreference != Server.READ_SECONDARY_ONLY
&& readPreference != Server.READ_SECONDARY) {
if(readPreference != ReadPreference.PRIMARY && readPreference != ReadPreference.SECONDARY
&& readPreference != ReadPreference.SECONDARY_PREFERRED) {
throw new Error("Illegal readPreference mode specified, " + readPreference);
}
// Set read Preference
this._readPreference = readPreference;
} else {
this._readPreference = null;
this._readPreference = null;
}
// Contains the isMaster information returned from the server

@@ -66,2 +69,3 @@ this.isMasterDoc;

this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {};
if(this.disableDriverBSONSizeCheck) this.socketOptions.disableDriverBSONSizeCheck = this.disableDriverBSONSizeCheck;
// Set ssl up if it's defined

@@ -71,8 +75,8 @@ if(this.ssl) {

}
// Set up logger if any set
this.logger = this.options.logger != null
&& (typeof this.options.logger.debug == 'function')
&& (typeof this.options.logger.error == 'function')
&& (typeof this.options.logger.log == 'function')
this.logger = this.options.logger != null
&& (typeof this.options.logger.debug == 'function')
&& (typeof this.options.logger.error == 'function')
&& (typeof this.options.logger.log == 'function')
? this.options.logger : {error:function(message, object) {}, log:function(message, object) {}, debug:function(message, object) {}};

@@ -86,5 +90,5 @@

// Contains state information about server connection
this._state = {'runtimeStats': {'queryStats':new RunningStats()}};
this._state = {'runtimeStats': {'queryStats':new RunningStats()}};
// Do we record server stats or not
this.recordQueryStats = false;
this.recordQueryStats = false;
};

@@ -97,7 +101,14 @@

inherits(Server, EventEmitter);
// Read Preferences
Server.READ_PRIMARY = 'primary';
Server.READ_SECONDARY = 'secondary';
Server.READ_SECONDARY_ONLY = 'secondaryOnly';
// Always ourselves
//
// Deprecated, USE ReadPreferences class
//
Server.READ_PRIMARY = ReadPreference.PRIMARY;
Server.READ_SECONDARY = ReadPreference.SECONDARY_PREFERRED;
Server.READ_SECONDARY_ONLY = ReadPreference.SECONDARY;
/**
* Always ourselves
* @ignore
*/
Server.prototype.setReadPreference = function() {}

@@ -108,3 +119,10 @@

*/
Server.prototype._isUsed = function() {
Server.prototype.isMongos = function() {
return this.isMasterDoc != null && this.isMasterDoc['msg'] == "isdbgrid" ? true : false;
}
/**
* @ignore
*/
Server.prototype._isUsed = function() {
return this._used;

@@ -116,3 +134,3 @@ }

*/
Server.prototype.close = function(callback) {
Server.prototype.close = function(callback) {
// Remove all local listeners

@@ -129,3 +147,3 @@ this.removeAllListeners();

// Set server status as disconnected
this._serverState = 'disconnected';
this._serverState = 'disconnected';
// Peform callback if present

@@ -153,3 +171,3 @@ if(typeof callback === 'function') callback();

Server.prototype.isSetMember = function() {
return this['replicasetInstance'] != null;
return this['replicasetInstance'] != null || this['mongosInstance'] != null;
}

@@ -161,6 +179,6 @@

Server.prototype.connect = function(dbInstance, options, callback) {
if('function' === typeof options) callback = options, options = {};
if('function' === typeof options) callback = options, options = {};
if(options == null) options = {};
if(!('function' === typeof callback)) callback = null;
// Currently needed to work around problems with multiple connections in a pool with ssl

@@ -170,9 +188,9 @@ // TODO fix if possible

// Set up socket options for ssl
this.socketOptions.ssl = true;
this.socketOptions.ssl = true;
}
// Let's connect
var server = this;
// Let's us override the main receiver of events
var eventReceiver = options.eventReceiver != null ? options.eventReceiver : this;
var eventReceiver = options.eventReceiver != null ? options.eventReceiver : this;
// Creating dbInstance

@@ -183,2 +201,5 @@ this.dbInstance = dbInstance;

// Force connection pool if there is one
if(server.connectionPool) server.connectionPool.stop();
// Set server state to connecting

@@ -192,11 +213,11 @@ this._serverState = 'connecting';

connectionPool.logger = this.logger;
// Set up a new pool using default settings
server.connectionPool = connectionPool;
// Set basic parameters passed in
var returnIsMasterResults = options.returnIsMasterResults == null ? false : options.returnIsMasterResults;
// Create a default connect handler, overriden when using replicasets
var connectCallback = function(err, reply) {
var connectCallback = function(err, reply) {
// ensure no callbacks get called twice

@@ -207,3 +228,3 @@ var internalCallback = callback;

// proxy killed connection etc, ignore the erorr as close event was isssued
if(err != null && internalCallback == null) return;
if(err != null && internalCallback == null) return;
// Internal callback

@@ -217,18 +238,21 @@ if(err != null) return internalCallback(err, null);

server.isMasterDoc = reply.documents[0];
// Emit open event
_emitAcrossAllDbInstances(server, eventReceiver, "open", null, returnIsMasterResults ? reply : dbInstance, null);
// If we have it set to returnIsMasterResults
if(returnIsMasterResults) {
internalCallback(null, reply);
internalCallback(null, reply, server);
} else {
internalCallback(null, dbInstance);
internalCallback(null, dbInstance, server);
}
};
// Let's us override the main connect callback
var connectHandler = options.connectHandler == null ? connectCallback : options.connectHandler;
var connectHandler = options.connectHandler == null ? connectCallback : options.connectHandler;
// Set up on connect method
connectionPool.on("poolReady", function() {
connectionPool.on("poolReady", function() {
// Create db command and Add the callback to the list of callbacks by the request id (mapping outgoing messages to correct callbacks)
var db_command = DbCommand.NcreateIsMasterCommand(dbInstance, dbInstance.databaseName);
var db_command = DbCommand.NcreateIsMasterCommand(dbInstance, dbInstance.databaseName);
// Check out a reader from the pool

@@ -241,3 +265,3 @@ var connection = connectionPool.checkoutConnection();

dbInstance._registerHandler(db_command, false, connection, connectHandler);
// Write the command out

@@ -254,3 +278,3 @@ connection.write(db_command);

// Parse the header
mongoReply.parseHeader(message, connectionPool.bson)
mongoReply.parseHeader(message, connectionPool.bson)
// If message size is not the same as the buffer size

@@ -260,12 +284,12 @@ // something went terribly wrong somewhere

// Emit the error
if(eventReceiver.listeners["error"] && eventReceiver.listeners["error"].length > 0) eventReceiver.emit("error", new Error("bson length is different from message length"), server);
if(eventReceiver.listeners("error") && eventReceiver.listeners("error").length > 0) eventReceiver.emit("error", new Error("bson length is different from message length"), server);
// Remove all listeners
server.removeAllListeners();
} else {
} else {
var startDate = new Date().getTime();
// Callback instance
var callbackInfo = null;
var dbInstanceObject = null;
// Locate a callback instance and remove any additional ones

@@ -283,3 +307,3 @@ for(var i = 0; i < server.dbInstances.length; i++) {

}
// Only execute callback if we have a caller

@@ -293,3 +317,3 @@ if(callbackInfo.callback && Array.isArray(callbackInfo.info.chained)) {

}
// If we have already fired then clean up rest of chain and move on

@@ -300,9 +324,43 @@ if(numberOfFoundCallbacks != chained.length) {

}
// Just return from function
return;
}
// Parse the body
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw, function(err) {
if(err != null) {
// If pool connection is already closed
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
server._serverState = 'disconnected';
// Remove all listeners and close the connection pool
server.removeAllListeners();
connectionPool.stop(true);
// If we have a callback return the error
if(typeof callback === 'function') {
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
// Perform callback
internalCallback(new Error("connection closed due to parseError"), null, server);
} else if(server.isSetMember()) {
if(server.listeners("parseError") && server.listeners("parseError").length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server);
} else {
if(eventReceiver.listeners("parseError") && eventReceiver.listeners("parseError").length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server);
}
// If we are a single server connection fire errors correctly
if(!server.isSetMember()) {
// Fire all callback errors
_fireCallbackErrors(server, new Error("connection closed due to parseError"));
// Emit error
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server, null, true);
}
// Short cut
return;
}
// Fetch the callback
var callbackInfo = dbInstanceObject._findHandler(mongoReply.responseTo.toString());

@@ -319,3 +377,3 @@ // If we have an error let's execute the callback and clean up all other

var chainedIds = callbackInfo.info.chained;
if(chainedIds.length > 0 && chainedIds[chainedIds.length - 1] == mongoReply.responseTo) {

@@ -325,3 +383,3 @@ // Cleanup all other chained calls

// Remove listeners
for(var i = 0; i < chainedIds.length; i++) dbInstanceObject._removeHandler(chainedIds[i]);
for(var i = 0; i < chainedIds.length; i++) dbInstanceObject._removeHandler(chainedIds[i]);
// Call the handler

@@ -333,3 +391,3 @@ dbInstanceObject._callHandler(mongoReply.responseTo, callbackInfo.info.results.shift(), null);

var handler = dbInstanceObject._findHandler(chainedIds[i]);
// Check if we have an object, if it's the case take the current object commands and
// Check if we have an object, if it's the case take the current object commands and
// and add this one

@@ -340,21 +398,54 @@ if(handler.info != null) {

}
}
}
}
}
});
});
} else if(callbackInfo.callback) {
// Parse the body
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw, function(err) {
if(err != null) {
// If pool connection is already closed
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
server._serverState = 'disconnected';
// Remove all listeners and close the connection pool
server.removeAllListeners();
connectionPool.stop(true);
// If we have a callback return the error
if(typeof callback === 'function') {
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
// Perform callback
internalCallback(new Error("connection closed due to parseError"), null, server);
} else if(server.isSetMember()) {
if(server.listeners("parseError") && server.listeners("parseError").length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server);
} else {
if(eventReceiver.listeners("parseError") && eventReceiver.listeners("parseError").length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server);
}
// If we are a single server connection fire errors correctly
if(!server.isSetMember()) {
// Fire all callback errors
_fireCallbackErrors(server, new Error("connection closed due to parseError"));
// Emit error
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server, null, true);
}
// Short cut
return;
}
// Let's record the stats info if it's enabled
if(server.recordQueryStats == true && server._state['runtimeStats'] != null
if(server.recordQueryStats == true && server._state['runtimeStats'] != null
&& server._state.runtimeStats['queryStats'] instanceof RunningStats) {
// Add data point to the running statistics object
server._state.runtimeStats.queryStats.push(new Date().getTime() - callbackInfo.info.start);
}
}
// Trigger the callback
dbInstanceObject._callHandler(mongoReply.responseTo, mongoReply, null);
});
}
}
});
}
}
} catch (err) {

@@ -365,5 +456,5 @@ // Throw error in next tick

})
}
}
});
// Handle timeout

@@ -375,4 +466,2 @@ connectionPool.on("timeout", function(err) {

server._serverState = 'disconnected';
// // Close the pool
// connectionPool.stop();
// If we have a callback return the error

@@ -384,20 +473,20 @@ if(typeof callback === 'function') {

// Perform callback
internalCallback(err, null);
internalCallback(err, null, server);
} else if(server.isSetMember()) {
if(server.listeners["timeout"] && server.listeners["timeout"].length > 0) server.emit("timeout", err, server);
if(server.listeners("timeout") && server.listeners("timeout").length > 0) server.emit("timeout", err, server);
} else {
if(eventReceiver.listeners["timeout"] && eventReceiver.listeners["timeout"].length > 0) eventReceiver.emit("timeout", err, server);
if(eventReceiver.listeners("timeout") && eventReceiver.listeners("timeout").length > 0) eventReceiver.emit("timeout", err, server);
}
// If we are a single server connection fire errors correctly
if(!server.isSetMember()) {
// Fire all callback errors
_fireCallbackErrors(server, err);
_fireCallbackErrors(server, err);
// Emit error
_emitAcrossAllDbInstances(server, eventReceiver, "timeout", err, server);
_emitAcrossAllDbInstances(server, eventReceiver, "timeout", err, server, true);
}
});
// Handle errors
connectionPool.on("error", function(message) {
connectionPool.on("error", function(message) {
// If pool connection is already closed

@@ -408,3 +497,3 @@ if(server._serverState === 'disconnected') return;

// If we have a callback return the error
if(typeof callback === 'function') {// && !server.isSetMember()) {
if(typeof callback === 'function') {
// ensure no callbacks get called twice

@@ -414,18 +503,18 @@ var internalCallback = callback;

// Perform callback
internalCallback(new Error(message && message.err ? message.err : message), null);
internalCallback(new Error(message && message.err ? message.err : message), null, server);
} else if(server.isSetMember()) {
if(server.listeners["error"] && server.listeners["error"].length > 0) server.emit("error", new Error(message && message.err ? message.err : message), server);
if(server.listeners("error") && server.listeners("error").length > 0) server.emit("error", new Error(message && message.err ? message.err : message), server);
} else {
if(eventReceiver.listeners["error"] && eventReceiver.listeners["error"].length > 0) eventReceiver.emit("error", new Error(message && message.err ? message.err : message), server);
if(eventReceiver.listeners("error") && eventReceiver.listeners("error").length > 0) eventReceiver.emit("error", new Error(message && message.err ? message.err : message), server);
}
// If we are a single server connection fire errors correctly
if(!server.isSetMember()) {
// Fire all callback errors
_fireCallbackErrors(server, new Error(message && message.err ? message.err : message));
_fireCallbackErrors(server, new Error(message && message.err ? message.err : message));
// Emit error
_emitAcrossAllDbInstances(server, eventReceiver, "error", new Error(message && message.err ? message.err : message), server);
_emitAcrossAllDbInstances(server, eventReceiver, "error", new Error(message && message.err ? message.err : message), server, true);
}
});
// Handle close events

@@ -437,4 +526,2 @@ connectionPool.on("close", function() {

server._serverState = 'disconnected';
// // Close the pool
// connectionPool.stop(true);
// If we have a callback return the error

@@ -446,15 +533,15 @@ if(typeof callback == 'function') {

// Perform callback
internalCallback(new Error("connection closed"), null);
internalCallback(new Error("connection closed"), null, server);
} else if(server.isSetMember()) {
if(server.listeners["close"] && server.listeners["close"].length > 0) server.emit("close", new Error("connection closed"), server);
if(server.listeners("close") && server.listeners("close").length > 0) server.emit("close", new Error("connection closed"), server);
} else {
if(eventReceiver.listeners["close"] && eventReceiver.listeners["close"].length > 0) eventReceiver.emit("close", new Error("connection closed"), server);
if(eventReceiver.listeners("close") && eventReceiver.listeners("close").length > 0) eventReceiver.emit("close", new Error("connection closed"), server);
}
// If we are a single server connection fire errors correctly
if(!server.isSetMember()) {
// Fire all callback errors
_fireCallbackErrors(server, new Error("connection closed"));
_fireCallbackErrors(server, new Error("connection closed"));
// Emit error
_emitAcrossAllDbInstances(server, eventReceiver, "close", server);
_emitAcrossAllDbInstances(server, eventReceiver, "close", server, null, true);
}

@@ -465,3 +552,3 @@ });

// error
connectionPool.on("parseError", function(message) {
connectionPool.on("parseError", function(message) {
// If pool connection is already closed

@@ -479,18 +566,18 @@ if(server._serverState === 'disconnected') return;

// Perform callback
internalCallback(new Error("connection closed due to parseError"), null);
internalCallback(new Error("connection closed due to parseError"), null, server);
} else if(server.isSetMember()) {
if(server.listeners["parseError"] && server.listeners["parseError"].length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server);
if(server.listeners("parseError") && server.listeners("parseError").length > 0) server.emit("parseError", new Error("connection closed due to parseError"), server);
} else {
if(eventReceiver.listeners["parseError"] && eventReceiver.listeners["parseError"].length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server);
if(eventReceiver.listeners("parseError") && eventReceiver.listeners("parseError").length > 0) eventReceiver.emit("parseError", new Error("connection closed due to parseError"), server);
}
// If we are a single server connection fire errors correctly
if(!server.isSetMember()) {
// Fire all callback errors
_fireCallbackErrors(server, new Error("connection closed due to parseError"));
_fireCallbackErrors(server, new Error("connection closed due to parseError"));
// Emit error
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server);
_emitAcrossAllDbInstances(server, eventReceiver, "parseError", server, null, true);
}
});
// Boot up connection poole, pass in a locator of callbacks

@@ -518,8 +605,7 @@ connectionPool.start();

// Only callback once and the last one is the right one
var finalCallback = chained.pop();
// console.dir(finalCallback)
var finalCallback = chained.pop();
if(info.connection.socketOptions.host === server.host && info.connection.socketOptions.port === server.port) {
dbInstance._callBackStore.emit(finalCallback, err, null);
}
}
// Put back the final callback to ensure we don't call all commands in the chain

@@ -535,6 +621,6 @@ chained.push(finalCallback);

dbInstance._callBackStore.emit(keys[j], err, null);
}
}
}
}
}
}
}
}

@@ -545,3 +631,3 @@

*/
var _emitAcrossAllDbInstances = function(server, filterDb, event, message, object) {
var _emitAcrossAllDbInstances = function(server, filterDb, event, message, object, resetConnection) {
// Emit close event across all db instances sharing the sockets

@@ -552,8 +638,13 @@ var allServerInstances = server.allServerInstances();

// For all db instances signal all db instances
if(Array.isArray(serverInstance.dbInstances) && serverInstance.dbInstances.length > 1) {
if(Array.isArray(serverInstance.dbInstances) && serverInstance.dbInstances.length >= 1) {
for(var i = 0; i < serverInstance.dbInstances.length; i++) {
var dbInstance = serverInstance.dbInstances[i];
// Set the parent
if(resetConnection && typeof dbInstance.openCalled != 'undefined')
dbInstance.openCalled = false;
// Check if it's our current db instance and skip if it is
if(filterDb == null || filterDb.databaseName !== dbInstance.databaseName || filterDb.tag !== dbInstance.tag) {
dbInstance.emit(event, message, object);
// Only emit if there is a listener
if(dbInstance.listeners(event).length > 0)
dbInstance.emit(event, message, object);
}

@@ -569,3 +660,3 @@ }

return this.connectionPool.getAllConnections();
}
}

@@ -576,3 +667,3 @@ /**

*/
var canCheckoutWriter = function(self, read) {
var canCheckoutWriter = function(self, read) {
// We cannot write to an arbiter or secondary server

@@ -583,6 +674,6 @@ if(self.isMasterDoc['arbiterOnly'] == true) {

return new Error("Cannot write to a secondary");
} else if(read == true && self._readPreference == Server.READ_SECONDARY_ONLY && self.isMasterDoc['ismaster'] == true) {
} else if(read == true && self._readPreference == ReadPreference.SECONDARY && self.isMasterDoc['ismaster'] == true) {
return new Error("Cannot read from primary when secondary only specified");
}
// Return no error

@@ -596,11 +687,13 @@ return null;

Server.prototype.checkoutWriter = function(read) {
if(read == true) return this.connectionPool.checkoutConnection();
if(read == true) return this.connectionPool.checkoutConnection();
// Check if are allowed to do a checkout (if we try to use an arbiter f.ex)
var result = canCheckoutWriter(this, read);
// If the result is null check out a writer
if(result == null) {
return this.connectionPool.checkoutConnection();
if(result == null && this.connectionPool != null) {
return this.connectionPool.checkoutConnection();
} else if(result == null) {
return null;
} else {
return result;
}
}
}

@@ -612,15 +705,15 @@

*/
var canCheckoutReader = function(self) {
var canCheckoutReader = function(self) {
// We cannot write to an arbiter or secondary server
if(self.isMasterDoc['arbiterOnly'] == true) {
if(self.isMasterDoc && self.isMasterDoc['arbiterOnly'] == true) {
return new Error("Cannot write to an arbiter");
} else if(self._readPreference != null) {
// If the read preference is Primary and the instance is not a master return an error
if(self._readPreference == Server.READ_PRIMARY && self.isMasterDoc['ismaster'] != true) {
return new Error("Read preference is " + Server.READ_PRIMARY + " and server is not master");
} else if(self._readPreference == Server.READ_SECONDARY_ONLY && self.isMasterDoc['ismaster'] == true) {
return new Error("Cannot read from primary when secondary only specified");
if((self._readPreference == ReadPreference.PRIMARY) && self.isMasterDoc['ismaster'] != true) {
return new Error("Read preference is Server.PRIMARY and server is not master");
} else if(self._readPreference == ReadPreference.SECONDARY && self.isMasterDoc['ismaster'] == true) {
return new Error("Cannot read from primary when secondary only specified");
}
}
// Return no error

@@ -637,7 +730,9 @@ return null;

// If the result is null check out a writer
if(result == null) {
return this.connectionPool.checkoutConnection();
if(result == null && this.connectionPool != null) {
return this.connectionPool.checkoutConnection();
} else if(result == null) {
return null;
} else {
return result;
}
}
}

@@ -653,3 +748,3 @@

/**
* Internal statistics object used for calculating average and standard devitation on
* Internal statistics object used for calculating average and standard devitation on
* running queries

@@ -664,3 +759,3 @@ * @ignore

this.m_newM = 0.0;
this.m_newS = 0.0;
this.m_newS = 0.0;

@@ -685,6 +780,6 @@ // Define getters

Object.defineProperty(this, "sScore", { enumerable: true
, get: function () {
, get: function () {
var bottom = this.mean + this.standardDeviation;
if(bottom == 0) return 0;
return ((2 * this.mean * this.standardDeviation)/(bottom));
return ((2 * this.mean * this.standardDeviation)/(bottom));
}

@@ -709,4 +804,4 @@ });

// set up for next iteration
this.m_oldM = this.m_newM;
this.m_oldS = this.m_newS;
this.m_oldM = this.m_newM;
this.m_oldS = this.m_newS;
}

@@ -722,3 +817,3 @@ }

}
});
});

@@ -735,3 +830,3 @@ /**

}
});
});

@@ -748,3 +843,3 @@ /**

}
});
});

@@ -758,3 +853,3 @@ /**

}
});
});

@@ -769,3 +864,3 @@ /**

}
});
});

@@ -779,3 +874,3 @@ /**

}
});
});

@@ -796,3 +891,3 @@ /**

}
});
});

@@ -799,0 +894,0 @@ /**

@@ -6,4 +6,5 @@ var Server = require("../server").Server;

// return time for the db command {ping:true}
var PingStrategy = exports.PingStrategy = function(replicaset) {
var PingStrategy = exports.PingStrategy = function(replicaset, secondaryAcceptableLatencyMS) {
this.replicaset = replicaset;
this.secondaryAcceptableLatencyMS = secondaryAcceptableLatencyMS;
this.state = 'disconnected';

@@ -18,7 +19,7 @@ // Class instance

// Start ping server
this._pingServer(callback);
this._pingServer(callback);
}
// Stops and kills any processes running
PingStrategy.prototype.stop = function(callback) {
PingStrategy.prototype.stop = function(callback) {
// Stop the ping process

@@ -30,30 +31,77 @@ this.state = 'disconnected';

PingStrategy.prototype.checkoutSecondary = function() {
// Get all secondary server keys
var keys = Object.keys(this.replicaset._state.secondaries);
// Contains the picked instance
var minimumPingMs = null;
var selectedInstance = null;
// Pick server key by the lowest ping time
for(var i = 0; i < keys.length; i++) {
// Fetch a server
var server = this.replicaset._state.secondaries[keys[i]];
// If we don't have a ping time use it
if(server.runtimeStats['pingMs'] == null) {
// Set to 0 ms for the start
server.runtimeStats['pingMs'] = 0;
// Pick server
selectedInstance = server;
break;
} else {
// If the next server's ping time is less than the current one choose than one
if(minimumPingMs == null || server.runtimeStats['pingMs'] < minimumPingMs) {
minimumPingMs = server.runtimeStats['pingMs'];
selectedInstance = server;
PingStrategy.prototype.checkoutSecondary = function(tags, secondaryCandidates) {
// Servers are picked based on the lowest ping time and then servers that lower than that + secondaryAcceptableLatencyMS
// Create a list of candidat servers, containing the primary if available
var candidateServers = [];
// If we have not provided a list of candidate servers use the default setup
if(!Array.isArray(secondaryCandidates)) {
candidateServers = this.replicaset._state.master != null ? [this.replicaset._state.master] : [];
// Add all the secondaries
var keys = Object.keys(this.replicaset._state.secondaries);
for(var i = 0; i < keys.length; i++) {
candidateServers.push(this.replicaset._state.secondaries[keys[i]])
}
} else {
candidateServers = secondaryCandidates;
}
// Final list of eligable server
var finalCandidates = [];
// If we have tags filter by tags
if(tags != null && typeof tags == 'object') {
// If we have an array or single tag selection
var tagObjects = Array.isArray(tags) ? tags : [tags];
// Iterate over all tags until we find a candidate server
for(var _i = 0; _i < tagObjects.length; _i++) {
// Grab a tag object
var tagObject = tagObjects[_i];
// Matching keys
var matchingKeys = Object.keys(tagObject);
// Remove any that are not tagged correctly
for(var i = 0; i < candidateServers.length; i++) {
var server = candidateServers[i];
// If we have tags match
if(server.tags != null) {
var matching = true;
// Ensure we have all the values
for(var j = 0; j < matchingKeys.length; j++) {
if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) {
matching = false;
break;
}
}
// If we have a match add it to the list of matching servers
if(matching) {
finalCandidates.push(server);
}
}
}
}
} else {
// Final array candidates
var finalCandidates = candidateServers;
}
// Return the selected instance
return selectedInstance != null ? selectedInstance.checkoutReader() : null;
// Sort by ping time
finalCandidates.sort(function(a, b) {
return a.runtimeStats['pingMs'] > b.runtimeStats['pingMs'];
});
// Cut off the array of anything slower than the [0].pingMs + secondaryAcceptableLatencyMS
for(var i = 0; i < finalCandidates.length; i++) {
if(finalCandidates[i].runtimeStats['pingMs'] > finalCandidates[0].runtimeStats['pingMs'] + this.secondaryAcceptableLatencyMS) {
// Slice out the array and break
finalCandidates = finalCandidates.slice(i);
break;
}
}
// If no candidates available return an error
if(finalCandidates.length == 0) return new Error("No replica set members available for query");
// Pick a random server
return finalCandidates[Math.round(Math.random(1000000) * (finalCandidates.length - 1))].checkoutReader();
}

@@ -63,3 +111,3 @@

var self = this;
// Ping server function

@@ -83,10 +131,10 @@ var pingFunction = function() {

// Add error listener
db.on("error", function(err) {
db.on("error", function(err) {
// Adjust the number of checks
numberOfEntries = numberOfEntries - 1;
// Close connection
db.close();
db.close();
// If we are done with all results coming back trigger ping again
if(numberOfEntries == 0 && self.state == 'connected') {
setTimeout(pingFunction, 1000);
setTimeout(pingFunction, 1000);
}

@@ -107,15 +155,15 @@ })

// Get end time of the command
var endTime = new Date().getTime();
var endTime = new Date().getTime();
// Store the ping time in the server instance state variable, if there is one
if(serverInstance != null && serverInstance.runtimeStats != null && serverInstance.isConnected()) {
serverInstance.runtimeStats['pingMs'] = (endTime - startTime);
serverInstance.runtimeStats['pingMs'] = (endTime - startTime);
}
// Close server
p_db.close();
p_db.close();
// If we are done with all results coming back trigger ping again
if(numberOfEntries == 0 && self.state == 'connected') {
setTimeout(pingFunction, 1000);
setTimeout(pingFunction, 1000);
}
})
})
}

@@ -126,7 +174,7 @@ })

}
// Start pingFunction
setTimeout(pingFunction, 1000);
// Do the callback
// Do the callback
callback(null);
}

@@ -1,2 +0,2 @@

// The Statistics strategy uses the measure of each end-start time for each
// The Statistics strategy uses the measure of each end-start time for each
// query executed against the db to calculate the mean, variance and standard deviation

@@ -9,7 +9,7 @@ // and pick the server which the lowest mean and deviation

// Starts any needed code
StatisticsStrategy.prototype.start = function(callback) {
StatisticsStrategy.prototype.start = function(callback) {
callback(null, null);
}
StatisticsStrategy.prototype.stop = function(callback) {
StatisticsStrategy.prototype.stop = function(callback) {
// Remove reference to replicaset

@@ -21,22 +21,63 @@ this.replicaset = null;

StatisticsStrategy.prototype.checkoutSecondary = function() {
// Get all secondary server keys
var keys = Object.keys(this.replicaset._state.secondaries);
// Contains the picked instance
var minimumSscore = null;
var selectedInstance = null;
StatisticsStrategy.prototype.checkoutSecondary = function(tags, secondaryCandidates) {
// Servers are picked based on the lowest ping time and then servers that lower than that + secondaryAcceptableLatencyMS
// Create a list of candidat servers, containing the primary if available
var candidateServers = [];
// Pick server key by the lowest ping time
for(var i = 0; i < keys.length; i++) {
// Fetch a server
var server = this.replicaset._state.secondaries[keys[i]];
// Pick server by lowest Sscore
if(minimumSscore == null || (server.queryStats.sScore < minimumSscore)) {
minimumSscore = server.queryStats.sScore;
selectedInstance = server;
// If we have not provided a list of candidate servers use the default setup
if(!Array.isArray(secondaryCandidates)) {
candidateServers = this.replicaset._state.master != null ? [this.replicaset._state.master] : [];
// Add all the secondaries
var keys = Object.keys(this.replicaset._state.secondaries);
for(var i = 0; i < keys.length; i++) {
candidateServers.push(this.replicaset._state.secondaries[keys[i]])
}
} else {
candidateServers = secondaryCandidates;
}
// Return the selected instance
return selectedInstance != null ? selectedInstance.checkoutReader() : null;
// Final list of eligable server
var finalCandidates = [];
// If we have tags filter by tags
if(tags != null && typeof tags == 'object') {
// If we have an array or single tag selection
var tagObjects = Array.isArray(tags) ? tags : [tags];
// Iterate over all tags until we find a candidate server
for(var _i = 0; _i < tagObjects.length; _i++) {
// Grab a tag object
var tagObject = tagObjects[_i];
// Matching keys
var matchingKeys = Object.keys(tagObject);
// Remove any that are not tagged correctly
for(var i = 0; i < candidateServers.length; i++) {
var server = candidateServers[i];
// If we have tags match
if(server.tags != null) {
var matching = true;
// Ensure we have all the values
for(var j = 0; j < matchingKeys.length; j++) {
if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) {
matching = false;
break;
}
}
// If we have a match add it to the list of matching servers
if(matching) {
finalCandidates.push(server);
}
}
}
}
} else {
// Final array candidates
var finalCandidates = candidateServers;
}
// If no candidates available return an error
if(finalCandidates.length == 0) return new Error("No replica set members available for query");
// Pick a random server
return finalCandidates[Math.round(Math.random(1000000) * (finalCandidates.length - 1))].checkoutReader();
}

@@ -10,3 +10,3 @@ var QueryCommand = require('./commands/query_command').QueryCommand,

* Constructor for a cursor object that handles all the operations on query result
* using find. This cursor object is unidirectional and cannot traverse backwards. Clients should not be creating a cursor directly,
* using find. This cursor object is unidirectional and cannot traverse backwards. Clients should not be creating a cursor directly,
* but use find to acquire a cursor.

@@ -27,2 +27,3 @@ *

* @param {Boolean} tailable allow the cursor to be tailable.
* @param {Boolean} awaitdata allow the cursor to wait for data, only applicable for tailable cursor.
* @param {Number} batchSize the number of the subset of results to request the database to return for every request. This should initially be greater than 1 otherwise the database will automatically close the cursor. The batch size can be set to 1 with cursorInstance.batchSize after performing the initial query to the database.

@@ -37,6 +38,9 @@ * @param {Boolean} raw return all query documents as raw buffers (default false).

* @param {String} comment you can put a $comment field on a query to make looking in the profiler logs simpler.
* @param {Boolean} awaitdata allow the cursor to wait for data, only applicable for tailable cursor.
* @param {Number} numberOfRetries if using awaidata specifies the number of times to retry on timeout.
* @param {String} dbName override the default dbName.
*/
function Cursor(db, collection, selector, fields, skip, limit
, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk, raw, read
, returnKey, maxScan, min, max, showDiskLoc, comment) {
, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk, raw, read
, returnKey, maxScan, min, max, showDiskLoc, comment, awaitdata, numberOfRetries, dbName) {
this.db = db;

@@ -54,2 +58,4 @@ this.collection = collection;

this.tailable = tailable;
this.awaitdata = awaitdata;
this.numberOfRetries = numberOfRetries == null ? 1 : numberOfRetries;
this.batchSizeValue = batchSize == null ? 0 : batchSize;

@@ -65,3 +71,3 @@ this.slaveOk = slaveOk == null ? collection.slaveOk : slaveOk;

this.comment = comment;
this.totalNumberOfRecords = 0;

@@ -71,2 +77,5 @@ this.items = [];

// This name
this.dbName = dbName;
// State variables for the cursor

@@ -77,3 +86,9 @@ this.state = Cursor.INIT;

this.getMoreTimer = false;
this.collectionName = (this.db.databaseName ? this.db.databaseName + "." : '') + this.collection.collectionName;
// If we are using a specific db execute against it
if(this.dbName != null) {
this.collectionName = this.dbName + "." + this.collection.collectionName;
} else {
this.collectionName = (this.db.databaseName ? this.db.databaseName + "." : '') + this.collection.collectionName;
}
};

@@ -85,3 +100,3 @@

*
* @return {Cursor} returns itself with rewind applied.
* @return {Cursor} returns itself with rewind applied.
* @api public

@@ -104,3 +119,3 @@ */

}
return self;

@@ -113,7 +128,7 @@ };

* is enough memory to store the results. Note that the array only contain partial
* results when this cursor had been previouly accessed. In that case,
* results when this cursor had been previouly accessed. In that case,
* cursor.rewind() can be used to reset the cursor.
*
* @param {Function} callback This will be called after executing this method successfully. The first paramter will contain the Error object if an error occured, or null otherwise. The second paramter will contain an array of BSON deserialized objects as a result of the query.
* @return {null}
* @return {null}
* @api public

@@ -132,4 +147,4 @@ */

var items = [];
this.each(function(err, item) {
this.each(function(err, item) {
if(err != null) return callback(err, null);

@@ -161,3 +176,3 @@

* @param {Function} callback this will be called for while iterating every document of the query result. The first paramter will contain the Error object if an error occured, or null otherwise. While the second paramter will contain the document.
* @return {null}
* @return {null}
* @api public

@@ -177,3 +192,3 @@ */

// Fetch the next object until there is no more objects
self.nextObject(function(err, item) {
self.nextObject(function(err, item) {
if(err != null) return callback(err, null);

@@ -199,3 +214,3 @@ if(item != null) {

* @param {Function} callback this will be after executing this method. The first paramter will contain the Error object if an error occured, or null otherwise. While the second paramter will contain the number of results or null if an error occured.
* @return {null}
* @return {null}
* @api public

@@ -255,6 +270,6 @@ */

throw new Error("Tailable cursor doesn't support limit");
}
}
} else if(this.queryRun == true || this.state == Cursor.CLOSED) {
if(callback) {
callback(new Error("Cursor is closed"), null);
callback(new Error("Cursor is closed"), null);
} else {

@@ -266,4 +281,4 @@ throw new Error("Cursor is closed");

if(callback) {
callback(new Error("limit requires an integer"), null);
} else {
callback(new Error("limit requires an integer"), null);
} else {
throw new Error("limit requires an integer");

@@ -281,2 +296,26 @@ }

/**
* Sets the read preference for the cursor
*
* @param {String} the read preference for the cursor, one of Server.READ_PRIMARY, Server.READ_SECONDARY, Server.READ_SECONDARY_ONLY
* @param {Function} [callback] this optional callback will be called after executing this method. The first parameter will contain an error object when the read preference given is not a valid number or when the cursor is already closed while the second parameter will contain a reference to this object upon successful execution.
* @return {Cursor} an instance of this object.
* @api public
*/
Cursor.prototype.setReadPreference = function(readPreference, tags, callback) {
if(typeof tags == 'function') callback = tags;
callback = callback || function(){};
if(this.queryRun == true || this.state == Cursor.CLOSED) {
callback(new Error("Cannot change read preference on executed query or closed cursor"));
} else if(readPreference == null && readPreference != 'primary'
&& readPreference != 'secondaryOnly' && readPreference != 'secondary') {
callback(new Error("only readPreference of primary, secondary or secondaryOnly supported"));
} else {
this.read = readPreference;
}
return this;
}
/**
* Sets the skip parameter of this cursor to the given value.

@@ -338,3 +377,3 @@ *

/**
* The limit used for the getMore command
* The limit used for the getMore command
*

@@ -349,3 +388,3 @@ * @return {Number} The number of records to request per batch.

var absBatchValue = Math.abs(self.batchSizeValue);
if(absLimitValue > 0) {

@@ -380,2 +419,7 @@ if (absBatchValue > 0) {

self.skipValue = self.limitValue = 0;
// if awaitdata is set
if(self.awaitdata != null) {
queryOptions |= QueryCommand.OPTS_AWAIT_DATA;
}
}

@@ -396,6 +440,5 @@

// Build special selector
var specialSelector = {'query':self.selector};
var specialSelector = {'$query':self.selector};
if(self.sortValue != null) specialSelector['orderby'] = utils.formattedOrderClause(self.sortValue);
if(self.hint != null && self.hint.constructor == Object) specialSelector['$hint'] = self.hint;
if(self.explainValue != null) specialSelector['$explain'] = true;
if(self.snapshot != null) specialSelector['$snapshot'] = true;

@@ -408,2 +451,7 @@ if(self.returnKey != null) specialSelector['$returnKey'] = self.returnKey;

if(self.comment != null) specialSelector['$comment'] = self.comment;
// If we have explain set only return a single document with automatic cursor close
if(self.explainValue != null) {
numberToReturn = (-1)*Math.abs(numberToReturn);
specialSelector['$explain'] = true;
}

@@ -430,3 +478,3 @@ // Return the query

*
* @param sortDirection {String|number} Range of acceptable values:
* @param sortDirection {String|number} Range of acceptable values:
* 'ascending', 'descending', 'asc', 'desc', 1, -1

@@ -477,3 +525,3 @@ *

}
result = null;

@@ -483,3 +531,11 @@ self.nextObject(callback);

self.db._executeQueryCommand(cmd, {read:self.read, raw:self.raw}, commandHandler);
// If we have no connection set on this cursor check one out
if(self.connection == null) {
// Fetch either a reader or writer dependent on the specified read option
self.connection = this.read == null ? self.db.serverConfig.checkoutWriter() : self.db.serverConfig.checkoutReader(this.read);
}
// Execute the command
self.db._executeQueryCommand(cmd, {raw:self.raw, read:this.read, connection:self.connection}, commandHandler);
// Set the command handler to null
commandHandler = null;

@@ -505,3 +561,3 @@ } else if(self.items.length) {

var limit = 0;
if (!self.tailable && self.limitValue > 0) {

@@ -522,3 +578,4 @@ limit = self.limitValue - self.totalNumberOfRecords;

var options = { read: self.read, raw: self.raw };
// Set up options
var options = { read: self.read, raw: self.raw, connection:self.connection };

@@ -528,3 +585,5 @@ // Execute the command

try {
if(err != null) callback(err, null);
if(err != null) {
return callback(err, null);
}

@@ -549,4 +608,14 @@ var isDead = 1 === result.responseFlag && result.cursorId.isZero();

callback(null, self.items.shift());
} else if(self.tailable && !isDead && self.awaitdata) {
// Excute the tailable cursor once more, will timeout after ~4 sec if awaitdata used
self.numberOfRetries = self.numberOfRetries - 1;
if(self.numberOfRetries == 0) {
self.close(function() {
callback(new Error("tailable cursor timed out"), null);
});
} else {
process.nextTick(function() {getMore(self, callback);});
}
} else if(self.tailable && !isDead) {
self.getMoreTimer = setTimeout(function() {getMore(self, callback);}, 500);
self.getMoreTimer = setTimeout(function() {getMore(self, callback);}, 100);
} else {

@@ -564,6 +633,6 @@ self.close(function() {callback(null, null);});

} catch(err) {
var handleClose = function() {
var handleClose = function() {
callback(err, null);
};
self.close(handleClose);

@@ -584,5 +653,7 @@ handleClose = null;

// Create a new cursor and fetch the plan
var cursor = new Cursor(this.db, this.collection, this.selector, this.fields, this.skipValue, limit,
this.sortValue, this.hint, true, this.snapshot, this.timeout, this.tailable, this.batchSizeValue);
// Fetch the explaination document
var cursor = new Cursor(this.db, this.collection, this.selector, this.fields, this.skipValue, limit
, this.sortValue, this.hint, true, this.snapshot, this.timeout, this.tailable, this.batchSizeValue
, this.slaveOk, this.raw, this.read, this.returnKey, this.maxScan, this.min, this.max, this.showDiskLoc
, this.comment, this.awaitdata, this.numberOfRetries, this.dbName);
// Fetch the explaination document
cursor.nextObject(function(err, item) {

@@ -631,3 +702,3 @@ if(err != null) return callback(err, null);

function execute(command) {
self.db._executeQueryCommand(command, {read:self.read, raw:self.raw}, function(err,result) {
self.db._executeQueryCommand(command, {read:self.read, raw:self.raw, connection:self.connection}, function(err,result) {
if(err) {

@@ -645,3 +716,3 @@ stream.emit('error', err);

}
var resflagsMap = {

@@ -653,3 +724,3 @@ CursorNotFound:1<<0,

};
if(result.documents && result.documents.length && !(result.responseFlag & resflagsMap.QueryFailure)) {

@@ -683,3 +754,3 @@ try {

}
return stream;

@@ -712,6 +783,8 @@ };

var command = new KillCursorCommand(this.db, [this.cursorId]);
this.db._executeQueryCommand(command, {read:self.read, raw:self.raw}, null);
this.db._executeQueryCommand(command, {read:self.read, raw:self.raw, connection:self.connection}, null);
} catch(err) {}
}
// Null out the connection
self.connection = null;
// Reset cursor id

@@ -742,3 +815,3 @@ this.cursorId = Long.fromInt(0);

* Init state
*
*
* @classconstant INIT

@@ -750,3 +823,3 @@ **/

* Cursor open
*
*
* @classconstant OPEN

@@ -758,3 +831,3 @@ **/

* Cursor closed
*
*
* @classconstant CLOSED

@@ -761,0 +834,0 @@ **/

@@ -14,3 +14,3 @@ /**

* - **error** {function(err) {}} the error event triggers if an error happens.
* - **end** {function() {}} the end event triggers when there is no more documents available.
* - **close** {function() {}} the end event triggers when there is no more documents available.
*

@@ -17,0 +17,0 @@ * @class Represents a CursorStream.

@@ -21,3 +21,3 @@ var Binary = require('bson').Binary,

if(!(this instanceof Chunk)) return new Chunk(file, mongoObject);
this.file = file;

@@ -40,3 +40,3 @@ var self = this;

this.data = new Binary(buffer);
} else if(mongoObjectFinal.data instanceof Binary || Object.prototype.toString.call(mongoObjectFinal.data) == "[object Binary]") {
} else if(mongoObjectFinal.data instanceof Binary || Object.prototype.toString.call(mongoObjectFinal.data) == "[object Binary]") {
this.data = mongoObjectFinal.data;

@@ -74,6 +74,6 @@ } else if(Buffer.isBuffer(mongoObjectFinal.data)) {

*/
Chunk.prototype.read = function(length) {
Chunk.prototype.read = function(length) {
// Default to full read if no index defined
length = length == null || length == 0 ? this.length() : length;
if(this.length() - this.internalPosition + 1 >= length) {

@@ -89,3 +89,3 @@ var data = this.data.read(this.internalPosition, length);

Chunk.prototype.readSlice = function(length) {
if ((this.length() - this.internalPosition + 1) >= length) {
if ((this.length() - this.internalPosition) >= length) {
var data = null;

@@ -147,7 +147,11 @@ if (this.data.buffer != null) { //Pure BSON

self.file.chunkCollection(function(err, collection) {
if(err) return callback(err);
collection.remove({'_id':self.objectId}, {safe:true}, function(err, result) {
if(err) return callback(err);
if(self.data.length() > 0) {
self.buildMongoObject(function(mongoObject) {
self.buildMongoObject(function(mongoObject) {
collection.insert(mongoObject, {safe:true}, function(err, collection) {
callback(null, self);
callback(err, self);
});

@@ -208,3 +212,3 @@ });

}
});
});

@@ -211,0 +215,0 @@ /**

@@ -15,3 +15,5 @@ /**

util = require('util'),
ReadStream = require('./readstream').ReadStream;
inherits = util.inherits,
ReadStream = require('./readstream').ReadStream,
Stream = require('stream');

@@ -31,3 +33,3 @@ var REFERENCE_BY_FILENAME = 0,

* - **root** {String}, root collection to use. Defaults to **{GridStore.DEFAULT_ROOT_COLLECTION}**.
* - **chunk_type** {String}, mime type of the file. Defaults to **{GridStore.DEFAULT_CONTENT_TYPE}**.
* - **content_type** {String}, mime type of the file. Defaults to **{GridStore.DEFAULT_CONTENT_TYPE}**.
* - **chunk_size** {Number}, size for the chunk. Defaults to **{Chunk.DEFAULT_CHUNK_SIZE}**.

@@ -44,26 +46,41 @@ * - **metadata** {Object}, arbitrary data the user wants to store.

*/
function GridStore(db, id, filename, mode, options) {
var GridStore = function GridStore(db, id, filename, mode, options) {
if(!(this instanceof GridStore)) return new GridStore(db, id, filename, mode, options);
var self = this;
this.db = db;
var _filename = filename;
this.db = db;
if(typeof filename == 'string' && typeof mode == 'string') {
_filename = filename;
} else if(typeof filename == 'string' && typeof mode == 'object' && mode != null) {
var _mode = mode;
// Call stream constructor
Stream.call(this);
// Handle options
if(options == null) options = {};
// Handle mode
if(mode == null) {
mode = filename;
options = _mode;
_filename = id;
} else if(typeof filename == 'string' && mode == null) {
filename = null;
} else if(typeof mode == 'object') {
options = mode;
mode = filename;
_filename = id;
filename = null;
}
// set grid referencetype
this.referenceBy = typeof id == 'string' ? 0 : 1;
this.filename = _filename;
this.fileId = typeof id == 'string' ? new ObjectID() : id;
// Handle id
if(id instanceof ObjectID && (typeof filename == 'string' || filename == null)) {
this.referenceBy = 1;
this.fileId = id;
this.filename = filename;
} else if(!(id instanceof ObjectID) && typeof id == 'string' && mode.indexOf("w") != null) {
this.referenceBy = 0;
this.fileId = new ObjectID();
this.filename = id;
} else if(!(id instanceof ObjectID) && typeof id == 'string' && mode.indexOf("r") != null) {
this.referenceBy = 0;
this.filename = filename;
} else {
this.referenceBy = 1;
this.fileId = id;
this.filename = filename;
}
// Set up the rest

@@ -75,3 +92,3 @@ this.mode = mode == null ? "r" : mode;

// Set default chunk size
this.internalChunkSize = this.options['chunkSize'] == null ? Chunk.DEFAULT_CHUNK_SIZE : this.options['chunkSize'];
this.internalChunkSize = this.options['chunkSize'] == null ? Chunk.DEFAULT_CHUNK_SIZE : this.options['chunkSize'];
// Previous chunk size

@@ -82,2 +99,13 @@ this.previousChunkSize = 0;

/**
* Code for the streaming capabilities of the gridstore object
* Most code from Aaron heckmanns project https://github.com/aheckmann/gridfs-stream
* Modified to work on the gridstore object itself
* @ignore
*/
GridStore.prototype = { __proto__: Stream.prototype }
// Move pipe to _pipe
GridStore.prototype._pipe = GridStore.prototype.pipe;
/**
* Opens the file from the database and initialize this object. Also creates a

@@ -97,11 +125,21 @@ * new one if file does not exist.

var self = this;
if((self.mode == "w" || self.mode == "w+") && self.db.serverConfig.primary != null) {
// Get files collection
self.collection(function(err, collection) {
// Get chunk collection
self.chunkCollection(function(err, chunkCollection) {
// Ensure index on chunk collection
chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) {
_open(self, callback);
if(err) return callback(err);
// Put index on filename
collection.ensureIndex([['filename', 1]], function(err, index) {
if(err) return callback(err);
// Get chunk collection
self.chunkCollection(function(err, chunkCollection) {
if(err) return callback(err);
// Ensure index on chunk collection
chunkCollection.ensureIndex([['files_id', 1], ['n', 1]], function(err, index) {
if(err) return callback(err);
_open(self, callback);
});
});

@@ -111,2 +149,3 @@ });

} else {
// Open the gridstore
_open(self, callback);

@@ -127,3 +166,3 @@ }

}
// Create the query

@@ -136,7 +175,12 @@ var query = self.referenceBy == REFERENCE_BY_ID ? {_id:self.fileId} : {filename:self.filename};

collection.find(query, function(err, cursor) {
if(err) return error(err);
// Fetch the file
cursor.nextObject(function(err, doc) {
if(err) return error(err);
// Check if the collection for the files exists otherwise prepare the new one
if(doc != null) {
self.fileId = doc._id;
self.filename = doc.filename;
self.contentType = doc.contentType;

@@ -150,4 +194,2 @@ self.internalChunkSize = doc.chunkSize;

} else {
// self.fileId =
// self.fileId = self.fileId instanceof ObjectID ? self.fileId : new ObjectID();
self.fileId = self.fileId == null ? new ObjectID() : self.fileId;

@@ -162,2 +204,3 @@ self.contentType = exports.GridStore.DEFAULT_CONTENT_TYPE;

nthChunk(self, 0, function(err, chunk) {
if(err) return error(err);
self.currentChunk = chunk;

@@ -170,2 +213,3 @@ self.position = 0;

deleteChunks(self, function(err, result) {
if(err) return error(err);
self.currentChunk = new Chunk(self, {'n':0});

@@ -180,2 +224,3 @@ self.contentType = self.options['content_type'] == null ? self.contentType : self.options['content_type'];

nthChunk(self, lastChunkNumber(self), function(err, chunk) {
if(err) return error(err);
// Set the current chunk

@@ -197,4 +242,6 @@ self.currentChunk = chunk == null ? new Chunk(self, {'n':0}) : chunk;

self.length = 0;
self.chunkCollection(function(err, collection2) {
if(err) return error(err);
// No file exists set up write mode

@@ -204,2 +251,3 @@ if(self.mode == "w") {

deleteChunks(self, function(err, result) {
if(err) return error(err);
self.currentChunk = new Chunk(self, {'n':0});

@@ -214,2 +262,3 @@ self.contentType = self.options['content_type'] == null ? self.contentType : self.options['content_type'];

nthChunk(self, lastChunkNumber(self), function(err, chunk) {
if(err) return error(err);
// Set the current chunk

@@ -226,2 +275,8 @@ self.currentChunk = chunk == null ? new Chunk(self, {'n':0}) : chunk;

});
// only pass error to callback once
function error (err) {
if(error.err) return;
callback(error.err = err);
}
};

@@ -241,3 +296,3 @@

fs.open(file, 'r', 0666, function (err, fd) {
// TODO Handle err
if(err) return callback(err);
self.writeFile(fd, callback);

@@ -249,24 +304,34 @@ });

self.open(function (err, self) {
if(err) return callback(err);
fs.fstat(file, function (err, stats) {
if(err) return callback(err);
var offset = 0;
var index = 0;
var numberOfChunksLeft = Math.min(stats.size / self.chunkSize);
// Write a chunk
var writeChunk = function() {
fs.read(file, self.chunkSize, offset, 'binary', function(err, data, bytesRead) {
if(err) return callback(err);
offset = offset + bytesRead;
// Create a new chunk for the data
var chunk = new Chunk(self, {n:index++});
chunk.write(data, function(err, chunk) {
if(err) return callback(err);
chunk.save(function(err, result) {
if(err) return callback(err);
self.position = self.position + data.length;
// Point to current chunk
self.currentChunk = chunk;
if(offset >= stats.size) {
fs.close(file);
self.close(function(err, result) {
return callback(null, result);
})
self.close(callback);
} else {

@@ -279,3 +344,3 @@ return process.nextTick(writeChunk);

}
// Process the first write

@@ -288,21 +353,2 @@ process.nextTick(writeChunk);

/**
* Writes some data. This method will work properly only if initialized with mode "w" or "w+".
*
* @param {String|Buffer} data the data to write.
* @param {Boolean} [close] closes this file after writing if set to true.
* @param {Function} callback this will be called after executing this method. The first parameter will contain null and the second one will contain a reference to this object.
* @return {null}
* @api public
*/
GridStore.prototype.write = function(data, close, callback) {
// If we have a buffer write it using the writeBuffer method
if(Buffer.isBuffer(data)) {
return writeBuffer(this, data, close, callback);
} else {
// Wrap the string in a buffer and write
return writeBuffer(this, new Buffer(data, 'binary'), close, callback);
}
};
/**
* Writes some data. This method will work properly only if initialized with mode

@@ -322,8 +368,8 @@ * "w" or "w+".

var writeBuffer = function(self, buffer, close, callback) {
if(typeof close === "function") { callback = close; close = null; }
var finalClose = (close == null) ? false : close;
if(self.mode[0] != "w") {
callback(new Error((self.referenceBy == REFERENCE_BY_ID ? self.toHexString() : self.filename) + " not opened for writing"), null);
} else {
if(typeof close === "function") { callback = close; close = null; }
var finalClose = (close == null) ? false : close;
if(self.mode[0] != "w") {
callback(new Error((self.referenceBy == REFERENCE_BY_ID ? self.toHexString() : self.filename) + " not opened for writing"), null);
} else {
if(self.currentChunk.position + buffer.length >= self.chunkSize) {

@@ -334,3 +380,3 @@ // Write out the current Chunk and then keep writing until we have less data left than a chunkSize left

var leftOverDataSize = self.chunkSize - self.currentChunk.position;
var firstChunkData = buffer.slice(0, leftOverDataSize);
var firstChunkData = buffer.slice(0, leftOverDataSize);
var leftOverData = buffer.slice(leftOverDataSize);

@@ -352,3 +398,3 @@ // A list of chunks to write out

}
// Set current chunk with remaining data

@@ -358,3 +404,3 @@ self.currentChunk = new Chunk(self, {'n': (previousChunkNumber + 1)});

if(leftOverData.length > 0) self.currentChunk.write(leftOverData);
// Update the position for the gridstore

@@ -368,4 +414,6 @@ self.position = self.position + buffer.length;

chunk.save(function(err, result) {
if(err) return callback(err);
numberOfChunksToWrite = numberOfChunksToWrite - 1;
if(numberOfChunksToWrite <= 0) {

@@ -375,3 +423,3 @@ return callback(null, self);

})
}
}
} else {

@@ -382,3 +430,3 @@ // Update the position for the gridstore

self.currentChunk.write(buffer);
callback(null, self);
callback(null, self);
}

@@ -394,3 +442,3 @@ }

* the structure:
*
*
* <pre><code>

@@ -423,9 +471,5 @@ * {

}
// console.log("============================== self.currentChunk.chunkNumber :: " + self.currentChunk.chunkNumber)
// console.log("============================== self.currentChunk.position :: " + self.currentChunk.position)
// console.log(self.position)
// Calcuate the length
var length = self.currentChunk != null ? (chunkNumber * self.chunkSize + previousChunkSize) : 0;
var length = self.currentChunk != null ? (chunkNumber * self.chunkSize + previousChunkSize) : 0;
var mongoObject = {

@@ -464,8 +508,14 @@ '_id': self.fileId,

self.currentChunk.save(function(err, chunk) {
if(err) return callback(err);
self.collection(function(err, files) {
if(err) return callback(err);
// Build the mongo object
if(self.uploadDate != null) {
files.remove({'_id':self.fileId}, {safe:true}, function(err, collection) {
if(err) return callback(err);
buildMongoObject(self, function(mongoObject) {
files.save(mongoObject, {safe:true}, function(err, doc) {
files.save(mongoObject, {safe:true}, function(err) {
callback(err, mongoObject);

@@ -478,3 +528,3 @@ });

buildMongoObject(self, function(mongoObject) {
files.save(mongoObject, {safe:true}, function(err, doc) {
files.save(mongoObject, {safe:true}, function(err) {
callback(err, mongoObject);

@@ -488,5 +538,7 @@ });

self.collection(function(err, files) {
if(err) return callback(err);
self.uploadDate = new Date();
buildMongoObject(self, function(mongoObject) {
files.save(mongoObject, {safe:true}, function(err, doc) {
files.save(mongoObject, {safe:true}, function(err) {
callback(err, mongoObject);

@@ -518,4 +570,10 @@ });

self.chunkCollection(function(err, collection) {
if(err) return callback(err);
collection.find({'files_id':self.fileId, 'n':chunkNumber}, function(err, cursor) {
if(err) return callback(err);
cursor.nextObject(function(err, chunk) {
if(err) return callback(err);
var finalChunk = chunk == null ? {} : chunk;

@@ -570,6 +628,5 @@ callback(null, new Chunk(self, finalChunk));

self.chunkCollection(function(err, collection) {
if(err!==null) {
callback(err, false);
}
if(err) return callback(err, false);
collection.remove({'files_id':self.fileId}, {safe:true}, function(err, result) {
if(err) return callback(err, false);
callback(null, true);

@@ -594,13 +651,13 @@ });

if(err!==null) {
callback("at deleteChunks: "+err);
return;
err.message = "at deleteChunks: " + err.message;
return callback(err);
}
self.collection(function(err, collection) {
if(err!==null) {
callback("at collection: "+err);
return;
err.message = "at collection: " + err.message;
return callback(err);
}
collection.remove({'_id':self.fileId}, {safe:true}, function(err, collection) {
collection.remove({'_id':self.fileId}, {safe:true}, function(err) {
callback(err, self);

@@ -636,3 +693,5 @@ });

this.read(function(err, data) {
this.read(function(err, data) {
if(err) return callback(err);
var items = data.toString().split(separator);

@@ -643,3 +702,3 @@ items = items.length > 0 ? items.splice(0, items.length - 1) : [];

}
callback(null, items);

@@ -663,2 +722,3 @@ });

deleteChunks(self, function(err, gridStore) {
if(err) return callback(err);
self.currentChunk = new Chunk(self, {'n': 0});

@@ -670,2 +730,3 @@ self.position = 0;

self.currentChunk(0, function(err, chunk) {
if(err) return callback(err);
self.currentChunk = chunk;

@@ -713,3 +774,3 @@ self.currentChunk.rewind();

if((self.currentChunk.length() - self.currentChunk.position + 1 + finalBuffer._index) >= finalLength) {
if((self.currentChunk.length() - self.currentChunk.position + finalBuffer._index) >= finalLength) {
var slice = self.currentChunk.readSlice(finalLength - finalBuffer._index);

@@ -725,3 +786,2 @@ // Copy content to final buffer

} else {
// console.dir(self.currentChunk)
var slice = self.currentChunk.readSlice(self.currentChunk.length() - self.currentChunk.position);

@@ -733,4 +793,6 @@ // Copy content to final buffer

// Load next chunk and read more
// Load next chunk and read more
nthChunk(self, self.currentChunk.chunkNumber + 1, function(err, chunk) {
if(err) return callback(err);
if(chunk.length() > 0) {

@@ -740,3 +802,7 @@ self.currentChunk = chunk;

} else {
finalBuffer._index > 0 ? callback(null, finalBuffer) : callback(new Error("no chunks found for file, possibly corrupt"), null);
if (finalBuffer._index > 0) {
callback(null, finalBuffer)
} else {
callback(new Error("no chunks found for file, possibly corrupt"), null);
}
}

@@ -799,3 +865,3 @@ });

self.currentChunk.position = (self.position % self.chunkSize);
callback(null, self);
callback(err, self);
});

@@ -805,3 +871,4 @@ };

if(self.mode[0] == 'w') {
self.currentChunk.save(function(err, chunk) {
self.currentChunk.save(function(err) {
if(err) return callback(err);
seekChunk();

@@ -845,3 +912,3 @@ });

self.position = self.position + 1;
callback(null, self.currentChunk.getc());
callback(err, self.currentChunk.getc());
});

@@ -887,3 +954,3 @@ } else {

* The collection to be used for holding the files and chunks collection.
*
*
* @classconstant DEFAULT_ROOT_COLLECTION

@@ -895,3 +962,3 @@ **/

* Default file mime type
*
*
* @classconstant DEFAULT_CONTENT_TYPE

@@ -903,3 +970,3 @@ **/

* Seek mode where the given length is absolute.
*
*
* @classconstant IO_SEEK_SET

@@ -911,3 +978,3 @@ **/

* Seek mode where the given length is an offset to the current read/write head.
*
*
* @classconstant IO_SEEK_CUR

@@ -919,3 +986,3 @@ **/

* Seek mode where the given length is an offset to the end of the file.
*
*
* @classconstant IO_SEEK_END

@@ -943,7 +1010,14 @@ **/

db.collection(rootCollectionFinal + ".files", function(err, collection) {
if(err) return callback(err);
// Build query
var query = (typeof fileIdObject == 'string' || Object.prototype.toString.call(fileIdObject) == '[object RegExp]' )
? {'filename':fileIdObject} : {'_id':fileIdObject}; // Attempt to locate file
? {'filename':fileIdObject}
: {'_id':fileIdObject}; // Attempt to locate file
collection.find(query, function(err, cursor) {
if(err) return callback(err);
cursor.nextObject(function(err, item) {
if(err) return callback(err);
callback(null, item == null ? false : true);

@@ -975,3 +1049,3 @@ });

}
// Check if we are returning by id not filename

@@ -983,10 +1057,14 @@ var byId = options['id'] != null ? options['id'] : false;

db.collection((rootCollectionFinal + ".files"), function(err, collection) {
if(err) return callback(err);
collection.find(function(err, cursor) {
cursor.each(function(err, item) {
if(item != null) {
items.push(byId ? item._id : item.filename);
} else {
callback(null, items);
}
});
if(err) return callback(err);
cursor.each(function(err, item) {
if(item != null) {
items.push(byId ? item._id : item.filename);
} else {
callback(err, items);
}
});
});

@@ -1023,2 +1101,3 @@ });

new GridStore(db, name, "r", options).open(function(err, gridStore) {
if(err) return callback(err);
// Make sure we are not reading out of bounds

@@ -1028,13 +1107,10 @@ if(offset && offset >= gridStore.length) return callback("offset larger than size of file", null);

if(offset && length && (offset + length) > gridStore.length) return callback("offset and length is larger than the size of the file", null);
if(offset != null) {
gridStore.seek(offset, function(err, gridStore) {
gridStore.read(length, function(err, data) {
callback(err, data);
});
if(err) return callback(err);
gridStore.read(length, callback);
});
} else {
gridStore.read(length, function(err, data) {
callback(err, data);
});
gridStore.read(length, callback);
}

@@ -1063,5 +1139,4 @@ });

new GridStore(db, name, "r", options).open(function(err, gridStore) {
gridStore.readlines(finalSeperator, function(err, lines) {
callback(err, lines);
});
if(err) return callback(err);
gridStore.readlines(finalSeperator, callback);
});

@@ -1098,4 +1173,7 @@ };

new GridStore(db, names, "w", options).open(function(err, gridStore) {
if(err) return callback(err);
deleteChunks(gridStore, function(err, result) {
if(err) return callback(err);
gridStore.collection(function(err, collection) {
if(err) return callback(err);
collection.remove({'_id':gridStore.fileId}, {safe:true}, function(err, collection) {

@@ -1112,3 +1190,3 @@ callback(err, self);

* Returns the current chunksize of the file.
*
*
* @field chunkSize

@@ -1135,3 +1213,3 @@ * @type {Number}

* The md5 checksum for this file.
*
*
* @field md5

@@ -1150,5 +1228,289 @@ * @type {Number}

/**
* GridStore Streaming methods
* Handles the correct return of the writeable stream status
* @ignore
*/
Object.defineProperty(GridStore.prototype, "writable", { enumerable: true
, get: function () {
if(this._writeable == null) {
this._writeable = this.mode != null && this.mode.indexOf("w") != -1;
}
// Return the _writeable
return this._writeable;
}
, set: function(value) {
this._writeable = value;
}
});
/**
* Handles the correct return of the readable stream status
* @ignore
*/
Object.defineProperty(GridStore.prototype, "readable", { enumerable: true
, get: function () {
if(this._readable == null) {
this._readable = this.mode != null && this.mode.indexOf("r") != -1;
}
return this._readable;
}
, set: function(value) {
this._readable = value;
}
});
GridStore.prototype.paused;
/**
* Handles the correct setting of encoding for the stream
* @ignore
*/
GridStore.prototype.setEncoding = fs.ReadStream.prototype.setEncoding;
/**
* Handles the end events
* @ignore
*/
GridStore.prototype.end = function end(data) {
var self = this;
// allow queued data to write before closing
if(!this.writable) return;
this.writable = false;
if(data) {
this._q.push(data);
}
this.on('drain', function () {
self.close(function (err) {
if (err) return _error(self, err);
self.emit('close');
});
});
_flush(self);
}
/**
* Handles the normal writes to gridstore
* @ignore
*/
var _writeNormal = function(self, data, close, callback) {
// If we have a buffer write it using the writeBuffer method
if(Buffer.isBuffer(data)) {
return writeBuffer(self, data, close, callback);
} else {
// Wrap the string in a buffer and write
return writeBuffer(self, new Buffer(data, 'binary'), close, callback);
}
}
/**
* Writes some data. This method will work properly only if initialized with mode "w" or "w+".
*
* @param {String|Buffer} data the data to write.
* @param {Boolean} [close] closes this file after writing if set to true.
* @param {Function} callback this will be called after executing this method. The first parameter will contain null and the second one will contain a reference to this object.
* @return {null}
* @api public
*/
GridStore.prototype.write = function write(data, close, callback) {
// If it's a normal write delegate the call
if(typeof close == 'function' || typeof callback == 'function') {
return _writeNormal(this, data, close, callback);
}
// Otherwise it's a stream write
var self = this;
if (!this.writable) {
throw new Error('GridWriteStream is not writable');
}
// queue data until we open.
if (!this._opened) {
// Set up a queue to save data until gridstore object is ready
this._q = [];
_openStream(self);
this._q.push(data);
return false;
}
// Push data to queue
this._q.push(data);
_flush(this);
// Return write successful
return true;
}
/**
* Handles the destroy part of a stream
* @ignore
*/
GridStore.prototype.destroy = function destroy() {
// close and do not emit any more events. queued data is not sent.
if(!this.writable) return;
this.readable = false;
if(this.writable) {
this.writable = false;
this._q.length = 0;
this.emit('close');
}
}
/**
* Handles the destroySoon part of a stream
* @ignore
*/
GridStore.prototype.destroySoon = function destroySoon() {
// as soon as write queue is drained, destroy.
// may call destroy immediately if no data is queued.
if(!this._q.length) {
return this.destroy();
}
this._destroying = true;
}
/**
* Handles the pipe part of the stream
* @ignore
*/
GridStore.prototype.pipe = function(destination, options) {
var self = this;
// Open the gridstore
this.open(function(err, result) {
if(err) _errorRead(self, err);
if(!self.readable) return;
// Set up the pipe
self._pipe(destination, options);
// Emit the stream is open
self.emit('open');
// Read from the stream
_read(self);
})
}
/**
* Internal module methods
* @ignore
*/
var _read = function _read(self) {
if (!self.readable || self.paused || self.reading) {
return;
}
self.reading = true;
var stream = self._stream = self.stream();
stream.paused = self.paused;
stream.on('data', function (data) {
if (self._decoder) {
var str = self._decoder.write(data);
if (str.length) self.emit('data', str);
} else {
self.emit('data', data);
}
});
stream.on('end', function (data) {
self.emit('end', data);
});
stream.on('error', function (data) {
_errorRead(self, data);
});
stream.on('close', function (data) {
self.emit('close', data);
});
self.pause = function () {
// native doesn't always pause.
// bypass its pause() method to hack it
self.paused = stream.paused = true;
}
self.resume = function () {
self.paused = false;
stream.resume();
self.readable = stream.readable;
}
self.destroy = function () {
self.readable = false;
stream.destroy();
}
}
/**
* pause
* @ignore
*/
GridStore.prototype.pause = function pause () {
// Overridden when the GridStore opens.
this.paused = true;
}
/**
* resume
* @ignore
*/
GridStore.prototype.resume = function resume () {
// Overridden when the GridStore opens.
this.paused = false;
}
/**
* Internal module methods
* @ignore
*/
var _flush = function _flush(self, _force) {
if (!self._opened) return;
if (!_force && self._flushing) return;
self._flushing = true;
// write the entire q to gridfs
if (!self._q.length) {
self._flushing = false;
self.emit('drain');
if(self._destroying) {
self.destroy();
}
return;
}
self.write(self._q.shift(), function (err, store) {
if (err) return _error(self, err);
self.emit('progress', store.position);
_flush(self, true);
});
}
var _openStream = function _openStream (self) {
if(self._opening == true) return;
self._opening = true;
// Open the store
self.open(function (err, gridstore) {
if (err) return _error(self, err);
self._opened = true;
self.emit('open');
_flush(self);
});
}
var _error = function _error(self, err) {
self.destroy();
self.emit('error', err);
}
var _errorRead = function _errorRead (self, err) {
self.readable = false;
self.emit('error', err);
}
/**
* @ignore
* @api private
*/
exports.GridStore = GridStore;

@@ -29,2 +29,3 @@ var Stream = require('stream').Stream,

this.completedLength = 0;
this.currentChunkNumber = 0;

@@ -36,2 +37,5 @@ this.paused = false;

// Calculate the number of chunks
this.numberOfChunks = Math.ceil(gstore.length/gstore.chunkSize);
var self = this;

@@ -77,13 +81,10 @@ process.nextTick(function() {

if ((gstore.currentChunk.length() - gstore.currentChunk.position + 1 + self.completedLength) >= self.finalLength) {
toRead = self.finalLength - self.completedLength;
self.executing = false;
last = true;
} else {
toRead = gstore.currentChunk.length();
if(gstore.currentChunk.chunkNumber >= (this.numberOfChunks - 1)) {
self.executing = false;
last = true;
}
var data = gstore.currentChunk.readSlice(toRead);
if(data != null) {
var data = gstore.currentChunk.readSlice(gstore.currentChunk.length());
if(data != null && gstore.currentChunk.chunkNumber == self.currentChunkNumber) {
self.currentChunkNumber = self.currentChunkNumber + 1;
self.completedLength += data.length;

@@ -129,3 +130,3 @@ self.pendingChunk = null;

gstore.currentChunk = self.pendingChunk;
self._execute();
self._execute();
});

@@ -169,16 +170,10 @@ }

}
this.paused = false;
var self = this;
if(self.pendingChunk != null) {
self.currentChunk = self.pendingChunk;
process.nextTick(function() {
self._execute();
});
} else {
self.readable = false;
self.emit("close");
}
process.nextTick(function() {
self._execute();
});
};
exports.ReadStream = ReadStream;

@@ -19,4 +19,6 @@ try {

, 'collection'
, 'connection/read_preference'
, 'connection/connection'
, 'connection/server'
, 'connection/mongos'
, 'connection/repl_set'

@@ -67,4 +69,6 @@ , 'cursor'

, 'collection'
, 'connection/read_preference'
, 'connection/connection'
, 'connection/server'
, 'connection/mongos'
, 'connection/repl_set'

@@ -119,4 +123,6 @@ , 'cursor'

, 'collection'
, 'connection/read_preference'
, 'connection/connection'
, 'connection/server'
, 'connection/mongos'
, 'connection/repl_set'

@@ -123,0 +129,0 @@ , 'cursor'

@@ -84,6 +84,10 @@ var Long = require('bson').Long;

} else {
// Parse documents
_self.index = bson.deserializeStream(binary_reply, _self.index, _batchSize, _self.documents, object_index);
// Adjust index
object_index = object_index + _batchSize;
try {
// Parse documents
_self.index = bson.deserializeStream(binary_reply, _self.index, _batchSize, _self.documents, object_index);
// Adjust index
object_index = object_index + _batchSize;
} catch (err) {
return callback(err);
}
}

@@ -102,19 +106,24 @@

}(this, binary_reply, batchSize, this.numberReturned)();
} else {
// Let's unpack all the bson documents, deserialize them and store them
for(var object_index = 0; object_index < this.numberReturned; object_index++) {
// Read the size of the bson object
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
// If we are storing the raw responses to pipe straight through
if(raw) {
// Deserialize the object and add to the documents array
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize));
} else {
// Deserialize the object and add to the documents array
this.documents.push(bson.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize)));
}
// Adjust binary index to point to next block of binary bson data
this.index = this.index + bsonObjectSize;
}
} else {
try {
// Let's unpack all the bson documents, deserialize them and store them
for(var object_index = 0; object_index < this.numberReturned; object_index++) {
// Read the size of the bson object
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
// If we are storing the raw responses to pipe straight through
if(raw) {
// Deserialize the object and add to the documents array
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize));
} else {
// Deserialize the object and add to the documents array
this.documents.push(bson.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize)));
}
// Adjust binary index to point to next block of binary bson data
this.index = this.index + bsonObjectSize;
}
} catch(err) {
return callback(err);
}
// No error return
callback(null);

@@ -121,0 +130,0 @@ }

{ "name" : "mongodb"
, "description" : "A node.js driver for MongoDB"
, "keywords" : ["mongodb", "mongo", "driver", "db"]
, "version" : "1.0.2"
, "version" : "1.1.0-beta"
, "author" : "Christian Amor Kvalheim <christkv@gmail.com>"

@@ -61,3 +61,3 @@ , "contributors" : [ "Aaron Heckmann",

, "dependencies" : {
"bson": "0.0.6"
"bson": "0.1.0"
}

@@ -68,3 +68,3 @@ , "devDependencies": {

, "ejs": "0.6.1"
, "nodeunit": "0.7.3"
, "nodeunit": "0.7.4"
, "github3": ">=0.3.0"

@@ -74,2 +74,3 @@ , "markdown": "0.3.1"

, "step": "0.0.5"
, "async": "0.1.22"
}

@@ -76,0 +77,0 @@ , "config": { "native" : false }

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc