Socket
Socket
Sign inDemoInstall

mongodb

Package Overview
Dependencies
Maintainers
1
Versions
551
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongodb - npm Package Compare versions

Comparing version 0.9.6-23 to 0.9.7

lib/mongodb/connection/repl_set_servers.js

21

deps/gleak/History.md
0.2.1 / 10-18-2011
==================
* fixed; package.json dependency versioning
0.2.0 / 10-11-2011
==================
* added; node v0.5 / v0.6 support
0.1.3 / 09-22-2011
==================
* use old school node engine format in package.json
0.1.2 / 09-08-2011
==================
* changed; utilize detectNew in middleware
* updated; docs
0.1.1 / 09-07-2011

@@ -3,0 +24,0 @@ ==================

28

deps/gleak/index.js

@@ -16,3 +16,3 @@

exports.version = '0.1.1';
exports.version = '0.2.1';

@@ -42,9 +42,6 @@ /**

var known = [];
setTimeout(print, 1000);
function print () {
g.detect().forEach(function (leak) {
if (~known.indexOf(leak)) return;
known.push(leak);
g.detectNew().forEach(function (leak) {
stream.write(format.replace(/%s/, leak) + '\n');

@@ -84,2 +81,3 @@ });

// v0.4.x
Gleak.prototype.whitelist = [

@@ -94,4 +92,24 @@ setTimeout

, global
, GLOBAL
, root
];
// check for new globals in >= v0.5x
var version = process.version.replace(/^v/, '').split('.');
if ('0' === version[0] && version[1] > 4) {
Gleak.prototype.whitelist.push(
ArrayBuffer
, Int8Array
, Uint8Array
, Int16Array
, Uint16Array
, Int32Array
, Uint32Array
, Float32Array
, Float64Array
, DataView
, 'errno' // node >= v0.5.x hack
)
}
/**

@@ -98,0 +116,0 @@ * Default format.

8

deps/gleak/package.json

@@ -5,3 +5,3 @@ {

"description": "Node global variable leak detector",
"version": "0.1.1",
"version": "0.2.1",
"repository": {

@@ -16,9 +16,9 @@ "type": "git"

"engines": {
"node": "~v0.4.0"
"node": ">=0.4.0"
},
"dependencies": {},
"devDependencies": {
"express": "~v2.0.0"
, "expresso": "v0.7.5"
"express": ">=2.0.0"
, "expresso": "0.7.5"
}
}
# Gleak
Global variable leak detection for Node.js
var gleak = require('gleak');
var detector = require('gleak')();
gleak.detect().forEach(function (name) {
detector.detect().forEach(function (name) {
console.warn('found global leak: %s', name);

@@ -14,2 +14,19 @@ });

## Detectable
As demonstrated, gleak comes with the `detect` method which returns
an array of all found variable leaks.
Often times we want to run the detector many times, progressively
checking for any new leaks that occurred since we last checked. In
this scenario we can utilize the `detectNew` method.
var detector = require('gleak')();
x = 1;
detector.detectNew(); // ['x']
detector.detectNew(); // []
y = 3;
detector.detectNew(); // ['y']
## Configurable:

@@ -20,13 +37,31 @@

var gleak = require('gleak');
gleak.whitelist.push(app, db);
var gleak = require('gleak')();
gleak.ignore(app, db);
`gleak.whitelist` is an array that holds all globals we want to ignore.
Push to it or blow it away completely with your own list.
The `gleak.ignore` method allows us to add globals we want to ignore
while safely ignoring duplicates.
`gleak.whitelist` is an array that holds all globals we are ignoring.
You can push to it or blow it away completely with your own list too.
var gleak = require('gleak')();
gleak.whitelist = [dnode, cluster];
Changes to your whitelists do not impact any global settings. For example:
var gleak = require('gleak');
var g1 = gleak();
var g2 = gleak();
g1.ignore(myglobal);
g2.whitelist.indexOf(myglobal) === -1;
`g2` does not inherit changes to `g1`s whitelist.
## Printable
If you don't want anything fancy and want to quickly dump all
global leaks to your console, just call `print()`.
var gleak = require('gleak')();
gleak.print(); // prints "Gleak!: leakedVarName"

@@ -33,0 +68,0 @@

@@ -12,3 +12,3 @@

var sys = require('sys');
var sys = require('util');

@@ -15,0 +15,0 @@ /**

@@ -14,3 +14,3 @@ /*!

fs = require('fs'),
sys = require('sys'),
sys = require('util'),
track = require('../track'),

@@ -17,0 +17,0 @@ path = require('path');

@@ -14,3 +14,3 @@ /*!

fs = require('fs'),
sys = require('sys'),
sys = require('util'),
path = require('path'),

@@ -17,0 +17,0 @@ AssertionError = require('assert').AssertionError;

@@ -20,6 +20,7 @@ Collections

db.createCollection(name, callback)
db.createCollection([[name[, options]], callback)
where `name` is the name of the collection and `callback` is a callback function. `db` is the database object.
where `name` is the name of the collection, options a set of configuration parameters and `callback` is a callback function. `db` is the database object.
The first parameter for

@@ -34,2 +35,7 @@ the callback is the error object (null if no error) and the second one is the pointer to the newly created

## Creating collections options
Several options can be passed to the `createCollection` function with `options` parameter.
* `raw` - driver returns documents as bson binary Buffer objects, `default:false`
### Collection properties

@@ -84,6 +90,11 @@

db.collection("name", callback);
db.collection([[name[, options]], callback);
If strict mode is off, then a new collection is created if not already present.
## Selecting collections options
Several options can be passed to the `collection` function with `options` parameter.
* `raw` - driver returns documents as bson binary Buffer objects, `default:false`
## Renaming collections

@@ -90,0 +101,0 @@

@@ -21,5 +21,11 @@ Database

* `auto_reconnect` - to reconnect automatically, default is false
* `poolSize` - specify the number of connections in the pool, default is 1
* `auto_reconnect` - to reconnect automatically, `default:false`
* `poolSize` - specify the number of connections in the pool `default:1`
* `retryMiliSeconds` - specify the number of milliseconds between connection attempts `default:5000`
* `numberOfRetries` - specify the number of retries for connection attempts `default:3`
* `reaperInterval` - specify the number of milliseconds between each reaper attempt `default:1000`
* `reaperTimeout` - specify the number of milliseconds for timing out callbacks that don't return `default:30000`
* `raw` - driver expects Buffer raw bson document, `default:false`
## DB options

@@ -70,2 +76,10 @@

## Sharing the connections over multiple dbs
To share the connection pool across multiple databases you database instance has method `db`
db_connector.db(name)
this returns a new `db` instance that shares the connections off the previous instance but will send all commands to the database `name`. This allows for better control of resource usage in a multiple database scenario.
## Deleting a database

@@ -72,0 +86,0 @@

@@ -82,3 +82,3 @@ GridStore

gs.read([size[, offset]], callback)
gs.read([size], callback)

@@ -88,3 +88,2 @@ where

* `size` is the length of the data to be read
* `offset` is the position to start reading
* `callback` is a callback function with two parameters - error object (if an error occured) and data (binary string)

@@ -131,3 +130,3 @@

* `db` is the database object
* `filename` is the name of the file to be checked
* `filename` is the name of the file to be checked or a regular expression
* `callback` is a callback function with two parameters - an error object (if an error occured) and a boolean value indicating if the file exists or not

@@ -134,0 +133,0 @@

@@ -66,2 +66,3 @@ Inserting and updating

* `upsert` - if true and no records match the query, insert `update` as a new record
* `raw` - driver returns updated document as bson binary Buffer, `default:false`

@@ -68,0 +69,0 @@ ### Replacement object

@@ -20,2 +20,3 @@ Queries

* `options` - defines extra logic (sorting options, paging etc.)
* `raw` - driver returns documents as bson binary Buffer objects, `default:false`

@@ -22,0 +23,0 @@ The result for the query is actually a cursor object. This can be used directly or converted to an array.

@@ -14,4 +14,7 @@ var sys = require('util'),

DBRef = require('../../lib/mongodb/bson/bson').DBRef,
Symbol = require('../../lib/mongodb/bson/bson').Symbol,
Double = require('../../lib/mongodb/bson/bson').Double,
Timestamp = require('../../lib/mongodb/bson/bson').Timestamp,
assert = require('assert');
var Long2 = require('./bson').Long,

@@ -21,2 +24,5 @@ ObjectID2 = require('./bson').ObjectID,

Code2 = require('./bson').Code,
Symbol2 = require('./bson').Symbol,
Double2 = require('./bson').Double,
Timestamp2 = require('./bson').Timestamp,
DBRef2 = require('./bson').DBRef;

@@ -30,2 +36,34 @@

//
// Assert correct toJSON
//
var binary1 = new Binary(new Buffer('00000000000000000000000000000000'));
var binary2 = new Binary2(new Buffer('00000000000000000000000000000000'));
assert.equal(JSON.stringify(binary1), JSON.stringify(binary2));
var objectId = new ObjectID();
var dbref1 = new DBRef('test', objectId, 'db');
var dbref2 = new DBRef2('test', ObjectID2.createFromHexString(objectId.toHexString()), 'db');
assert.equal(JSON.stringify(dbref1), JSON.stringify(dbref2));
var symbol1 = new Symbol('hello');
var symbol2 = new Symbol2('hello');
assert.equal(JSON.stringify(symbol1), JSON.stringify(symbol2));
var double1 = new Double(3.232);
var double2 = new Double2(3.232);
assert.equal(JSON.stringify(double1), JSON.stringify(double2));
var code1 = new Code('hello', {a:1})
var code2 = new Code2('hello', {a:1})
assert.equal(JSON.stringify(code1), JSON.stringify(code2));
var long1 = Long.fromNumber(1000);
var long2 = Long2.fromNumber(1000);
assert.equal(JSON.stringify(long1), JSON.stringify(long2));
var timestamp1 = Timestamp.fromNumber(1000);
var timestamp2 = Timestamp2.fromNumber(1000);
assert.equal(JSON.stringify(timestamp1), JSON.stringify(timestamp2));
// Long data type tests

@@ -322,2 +360,16 @@ var l2_string = Long2.fromNumber(100);

// Serialize function
var doc = {
_id: 'testid',
key1: function() {}
}
var simple_string_serialized = BSONJS.serialize(doc, false, true, true);
var simple_string_serialized_2 = BSON.serialize(doc, false, true, true);
// Deserialize the string
var doc1 = BSONJS.deserialize(new Buffer(simple_string_serialized_2));
var doc2 = BSON.deserialize(new Buffer(simple_string_serialized_2));
assert.equal(doc1.key1.code.toString(), doc2.key1.code.toString())
// Force garbage collect

@@ -324,0 +376,0 @@ global.gc();

@@ -116,2 +116,6 @@

Binary.prototype.toJSON = function() {
return this.buffer != null ? this.buffer.toString('base64') : '';
}
Binary.BUFFER_SIZE = 256;

@@ -118,0 +122,0 @@

@@ -72,3 +72,3 @@ /**

// Experiment for performance
BSON.calculateObjectSize = function(object) {
BSON.calculateObjectSize = function(object, serializeFunctions) {
var totalLength = (4 + 1);

@@ -84,2 +84,4 @@ var done = false;

var finished = false;
// Ensure serialized functions set
serializeFunctions = serializeFunctions == null ? false : serializeFunctions;

@@ -129,23 +131,2 @@ while(!done) {

totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (8 + 1);
// // Write the type
// buffer[index++] = BSON.BSON_DATA_NUMBER;
// // Write the name
// if(name != null) {
// index = index + buffer.write(name, index, 'utf8') + 1;
// buffer[index - 1] = 0;
// }
//
// // Write float
// ieee754.writeIEEE754(buffer, value, index, 'little', 52, 8);
// // Ajust index
// index = index + 8;
// } else if(typeof value == 'number' || toString.call(value) === '[object Number]') {
// if(value >= BSON.BSON_INT32_MAX || value < BSON.BSON_INT32_MIN ||
// value !== parseInt(value, 10)) {
// // Long and Number take same number of bytes.
// totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (8 + 1);
// } else {
// totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (4 + 1);
// }
} else if(typeof value == 'boolean' || toString.call(value) === '[object Boolean]') {

@@ -210,3 +191,3 @@ totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (1 + 1);

totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (Buffer.byteLength(value.value, 'utf8') + 4 + 1 + 1);
} else if(typeof value == 'function') {
} else if(typeof value == 'function' && serializeFunctions) {
// Calculate the length of the code string

@@ -249,3 +230,3 @@ totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + 4 + (Buffer.byteLength(value.toString(), 'utf8') + 1 + 1);

// In place serialization with index to starting point of serialization
BSON.serializeWithBufferAndIndex = function serializeWithBufferAndIndex(object, checkKeys, buffer, startIndex) {
BSON.serializeWithBufferAndIndex = function serializeWithBufferAndIndex(object, checkKeys, buffer, startIndex, serializeFunctions) {
if(null != object && 'object' === typeof object) {

@@ -268,2 +249,4 @@ // Encode the object using single allocated buffer and no recursion

var finished = false;
// Ensure serialized functions set
serializeFunctions = serializeFunctions == null ? false : serializeFunctions;

@@ -669,3 +652,3 @@ // Current parsing object state

buffer[index++] = 0;
} else if(typeof value == 'function') {
} else if(typeof value == 'function' && serializeFunctions) {
// Write the type

@@ -798,5 +781,5 @@ buffer[index++] = BSON.BSON_DATA_CODE;

BSON.serialize = function(object, checkKeys, asBuffer) {
var buffer = new Buffer(BSON.calculateObjectSize(object));
BSON.serializeWithBufferAndIndex(object, checkKeys, buffer, 0);
BSON.serialize = function(object, checkKeys, asBuffer, serializeFunctions) {
var buffer = new Buffer(BSON.calculateObjectSize(object, serializeFunctions));
BSON.serializeWithBufferAndIndex(object, checkKeys, buffer, 0, serializeFunctions);
return buffer;

@@ -1328,2 +1311,6 @@ }

Code.prototype.toJSON = function() {
return {scope:this.scope, code:this.code};
}
/**

@@ -1346,2 +1333,6 @@ * Symbol constructor.

Symbol.prototype.toJSON = function() {
return this.value;
}
/**

@@ -1374,7 +1365,7 @@ * MinKey constructor

DBRef.prototype.toJSON = function() {
return JSON.stringify({
return {
'$ref':this.namespace,
'$id':this.oid,
'$db':this.db == null ? '' : this.db
});
};
}

@@ -1381,0 +1372,0 @@

@@ -10,1 +10,5 @@ exports.Double = Double;

};
Double.prototype.toJSON = function() {
return this.value;
}

@@ -80,2 +80,4 @@ /**

this.slaveOk = options == null || options.slaveOk == null ? db.slaveOk : options.slaveOk;
this.serializeFunctions = options == null || options.serializeFunctions == null ? db.serializeFunctions : options.serializeFunctions;
this.raw = options == null || options.raw == null ? db.raw : options.raw;

@@ -187,3 +189,3 @@ this.pkFactory = pkFactory == null

// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection)
this.db.executeCommand(deleteCommand, commandOptions, function (err, error) {
this.db._executeRemoveCommand(deleteCommand, commandOptions, function (err, error) {
error = error && error.documents;

@@ -201,3 +203,3 @@ if(!callback) return;

} else {
var result = this.db.executeCommand(deleteCommand);
var result = this.db._executeRemoveCommand(deleteCommand);
// If no callback just return

@@ -248,3 +250,3 @@ if (!callback) return;

if(!('function' === typeof callback)) callback = null;
// Insert options (flags for insert)

@@ -256,2 +258,10 @@ var insertFlags = {};

}
// Either use override on the function, or go back to default on either the collection
// level or db
if(options['serializeFunctions'] != null) {
insertFlags['serializeFunctions'] = options['serializeFunctions'];
} else {
insertFlags['serializeFunctions'] = this.serializeFunctions;
}

@@ -266,5 +276,5 @@ // Pass in options

var doc = docs[index];
// Add id to each document if it's not already defined
if (!doc['_id'] && this.db.forceServerObjectId != true) {
if (!(doc instanceof Buffer) && !doc['_id'] && this.db.forceServerObjectId != true) {
doc['_id'] = this.pkFactory.createPk();

@@ -283,4 +293,3 @@ }

// Default command options
var commandOptions = {};
var commandOptions = {};
// If safe is defined check for error message

@@ -305,3 +314,3 @@ // if(options != null && (options.safe == true || this.db.strict == true || this.opts.safe == true)) {

// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection)
this.db.executeCommand(insertCommand, commandOptions, function (err, error) {
this.db._executeInsertCommand(insertCommand, commandOptions, function (err, error) {
error = error && error.documents;

@@ -319,3 +328,3 @@ if(!callback) return;

} else {
var result = this.db.executeCommand(insertCommand, commandOptions);
var result = this.db._executeInsertCommand(insertCommand, commandOptions);
// If no callback just return

@@ -386,2 +395,10 @@ if(!callback) return;

// Either use override on the function, or go back to default on either the collection
// level or db
if(options['serializeFunctions'] != null) {
options['serializeFunctions'] = options['serializeFunctions'];
} else {
options['serializeFunctions'] = this.serializeFunctions;
}
var updateCommand = new UpdateCommand(

@@ -399,3 +416,3 @@ this.db

errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions;
// If we are executing in strict mode or safe both the update and the safe command must happen on the same line

@@ -418,3 +435,3 @@ if(errorOptions && errorOptions != false) {

// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection)
this.db.executeCommand(updateCommand, commandOptions, function (err, error) {
this.db._executeUpdateCommand(updateCommand, commandOptions, function (err, error) {
error = error && error.documents;

@@ -432,3 +449,4 @@ if(!callback) return;

} else {
var result = this.db.executeCommand(updateCommand);
// Execute update
var result = this.db._executeUpdateCommand(updateCommand);
// If no callback just return

@@ -463,3 +481,3 @@ if (!callback) return;

this.db.executeCommand(cmd, {read:true}, function (err, result) {
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) {
if (err) {

@@ -509,3 +527,3 @@ return callback(err);

var self = this;
this.db.executeCommand(queryCommand, {read:true}, function (err, result) {
this.db._executeQueryCommand(queryCommand, {read:true}, function (err, result) {
result = result && result.documents;

@@ -573,2 +591,10 @@ if(!callback) return;

}
// Either use override on the function, or go back to default on either the collection
// level or db
if(options['serializeFunctions'] != null) {
options['serializeFunctions'] = options['serializeFunctions'];
} else {
options['serializeFunctions'] = this.serializeFunctions;
}

@@ -582,8 +608,7 @@ // Unpack the error options if any

var connection = this.db.serverConfig.checkoutWriter();
var rawConnection = connection.getConnection().connection;
// Ensure we execute against the same raw connection so we can get the correct error
// result after the execution of the findAndModify finishes
this.db.executeDbCommand(queryObject, {writer:rawConnection, allReturn:true, safe:errorOptions}, function (err, result) {
result = result && result.documents;
this.db.executeDbCommand(queryObject, {writer:connection, serializeFunctions:options['serializeFunctions']}, function (err, firstResult) {
firstResult = firstResult && firstResult.documents;
if(!callback) return;

@@ -593,6 +618,17 @@

callback(err);
} else if(!result[0].ok) {
callback(self.db.wrap(result[0]));
} else if(!firstResult[0].ok) {
callback(self.db.wrap(firstResult[0]));
} else {
return callback(null, result[0].value);
// If we have a request for a last error command
if(errorOptions != null && errorOptions != false) {
self.db.lastError(errorOptions, {writer:connection}, function(err, secondResult) {
if(secondResult[0].err != null) {
callback(self.db.wrap(secondResult[0]));
} else {
return callback(null, firstResult[0].value);
}
})
} else {
return callback(null, firstResult[0].value);
}
}

@@ -636,3 +672,3 @@ });

// backwards compat for options object
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout','tailable', 'batchSize']
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout','tailable', 'batchSize', 'raw']
, is_option = false;

@@ -659,3 +695,32 @@

if (options && options.fields) {
// Validate correctness off the selector
var object = selector;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
// Validate correctness of the field selector
var object = fields;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
// Check special case where we are using an objectId
if(selector instanceof this.db.bson_serializer.ObjectID) {
selector = {_id:selector};
}
// If it's a serialized fields field we need to just let it through
// user be warned it better be good
if (options && options.fields && !(options.fields instanceof Buffer)) {
fields = {};

@@ -676,5 +741,5 @@ if (Array.isArray(options.fields)) {

if (!options) options = {};
options.skip = len > 3 ? args[2] : options.skip ? options.skip : 0;
options.limit = len > 3 ? args[3] : options.limit ? options.limit : 0;
options.raw = options.raw != null && typeof options.raw === 'boolean' ? options.raw : this.raw;
options.hint = options.hint != null ? this.normalizeHintField(options.hint) : this.internalHint;

@@ -684,11 +749,10 @@ options.timeout = len == 5 ? args[4] : typeof options.timeout === 'undefined' ? undefined : options.timeout;

options.slaveOk = options.slaveOk != null ? options.slaveOk : this.db.slaveOk;
var o = options;
// callback for backward compatibility
if (callback) {
// TODO refactor Cursor args
callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk));
callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw));
} else {
return new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk);
return new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw);
}

@@ -738,66 +802,145 @@ };

*/
/**
* Various argument possibilities
* TODO : combine/reduce # of possibilities
* 1 callback?
* 2 selector, callback?,
* 2 callback?, options // really?!
* 3 selector, fields, callback?
* 3 selector, options, callback?
* 4,selector, fields, options, callback?
* 5 selector, fields, skip, limit, callback?
* 6 selector, fields, skip, limit, timeout, callback?
*
* Available options:
* limit, sort, fields, skip, hint, explain, snapshot, timeout, tailable, batchSize
*/
Collection.prototype.findOne = function findOne (queryObject, options, callback) {
Collection.prototype.findOne = function findOne () {
var self = this;
if ('function' === typeof queryObject) {
callback = queryObject;
queryObject = {};
options = {};
} else if ('function' === typeof options) {
callback = options;
options = {};
var options
, args = Array.prototype.slice.call(arguments, 0)
, has_callback = typeof args[args.length - 1] === 'function'
, has_weird_callback = typeof args[0] === 'function'
, callback = has_callback ? args.pop() : (has_weird_callback ? args.shift() : null)
, len = args.length
, selector = len >= 1 ? args[0] : {}
, fields = len >= 2 ? args[1] : undefined;
if (len === 1 && has_weird_callback) {
// backwards compat for callback?, options case
selector = {};
options = args[0];
}
var fields;
if (len === 2) {
// backwards compat for options object
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout','tailable', 'batchSize', 'raw']
, is_option = false;
if (options.fields && Array.isArray(options.fields)) {
fields = {};
if (0 === options.fields.length) {
fields['_id'] = 1;
} else {
for (var i = 0, len = options.fields.length; i < len; ++i) {
fields[options.fields[i]] = 1;
for (var idx = 0, l = test.length; idx < l; ++idx) {
if (test[idx] in fields) {
is_option = true;
break;
}
}
} else {
fields = options.fields;
if (is_option) {
options = fields;
fields = undefined;
} else {
options = {};
}
}
if (queryObject instanceof this.db.bson_serializer.ObjectID ||
'[object ObjectID]' === toString.call(queryObject)) {
queryObject = { '_id': queryObject };
if (3 === len) {
options = args[2];
}
var query = { 'query': queryObject };
// If we have a timeout set
var timeout = null != options.timeout
? options.timeout
: QueryCommand.OPTS_NONE;
// Make sure we can do slaveOk commands
if (options.slaveOk || this.slaveOk || this.db.slaveOk) {
timeout |= QueryCommand.OPTS_SLAVE;
// Validate correctness off the selector
var object = selector;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
// Validate correctness of the field selector
var object = fields;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
// Check special case where we are using an objectId
if(selector instanceof this.db.bson_serializer.ObjectID) {
selector = {_id:selector};
}
var collectionName = (this.db.databaseName ? this.db.databaseName + '.' : '')
+ this.collectionName;
var queryCommand = new QueryCommand(
this.db
, collectionName
, timeout
, 0
, 1
, query
, fields);
this.db.executeCommand(queryCommand, {read:true}, function (err, result) {
if (!err && result.documents[0] && result.documents[0]['$err']) {
return callback(self.db.wrap(result));
// If it's a serialized fields field we need to just let it through
// user be warned it better be good
if (options && options.fields && !(options.fields instanceof Buffer)) {
fields = {};
if (Array.isArray(options.fields)) {
if (!options.fields.length) {
fields['_id'] = 1;
} else {
for (var i = 0, l = options.fields.length; i < l; i++) {
fields[options.fields[i]] = 1;
}
}
} else {
fields = options.fields;
}
}
callback(err, result && result.documents && result.documents.length > 0 ? result.documents[0] : null);
if (!options) options = {};
options.skip = len > 3 ? args[2] : options.skip ? options.skip : 0;
options.limit = len > 3 ? args[3] : options.limit ? options.limit : 0;
options.raw = options.raw != null && typeof options.raw === 'boolean' ? options.raw : this.raw;
options.hint = options.hint != null ? this.normalizeHintField(options.hint) : this.internalHint;
options.timeout = len == 5 ? args[4] : typeof options.timeout === 'undefined' ? undefined : options.timeout;
// If we have overridden slaveOk otherwise use the default db setting
options.slaveOk = options.slaveOk != null ? options.slaveOk : this.db.slaveOk;
// Create cursor instance
var o = options;
var cursor = new Cursor(this.db, this, selector, fields, o.skip, 1, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw);
cursor.toArray(function(err, items) {
if(err != null) return callback(err instanceof Error ? err : self.db.wrap(new Error(err)), null);
if(items.length == 1) return callback(null, items[0]);
callback(null, null);
});
// // callback for backward compatibility
// if (callback) {
// // TODO refactor Cursor args
// callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw));
// } else {
// return new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw);
// }
// var self = this;
// // Get all the arguments
// var args = Array.prototype.slice.call(arguments, 0);
// // Retrieve the callback
// var callback = args.pop();
// try {
// // Call find function with one item
// this.find.apply(this, args).limit(1).toArray(function(err, items) {
// if(err != null) return callback(err instanceof Error ? err : self.db.wrap(new Error(err)), null);
// if(items.length == 1) return callback(null, items[0]);
// callback(null, null);
// });
//
// }catch(err) {
// console.log("----------------------------------- 444444444444444444444444444444444")
// console.dir(err)
// }
};

@@ -922,3 +1065,3 @@

this.db.executeCommand(cmd, {read:true}, function (err, result) {
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) {
if (err) {

@@ -1040,3 +1183,3 @@ return callback(err);

this.db.executeCommand(cmd, {read:true}, function (err, result) {
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) {
if (err) return callback(err);

@@ -1043,0 +1186,0 @@

@@ -17,2 +17,7 @@ var BinaryParser = require('../bson/binary_parser').BinaryParser,

BaseCommand.prototype.updateRequestId = function() {
this.requestId = id++;
return this.requestId;
};
// OpCodes

@@ -19,0 +24,0 @@ BaseCommand.OP_REPLY = 1;

@@ -11,5 +11,4 @@ var QueryCommand = require('./query_command').QueryCommand,

**/
var DbCommand = exports.DbCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector) {
QueryCommand.call(db, this);
var DbCommand = exports.DbCommand = function(dbInstance, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector, options) {
QueryCommand.call(this);
this.collectionName = collectionName;

@@ -21,3 +20,10 @@ this.queryOptions = queryOptions;

this.returnFieldSelector = returnFieldSelector;
this.db = db;
this.db = dbInstance;
// Make sure we don't get a null exception
options = options == null ? {} : options;
// Let us defined on a command basis if we want functions to be serialized or not
if(options['serializeFunctions'] != null && options['serializeFunctions']) {
this.serializeFunctions = true;
}
};

@@ -34,2 +40,7 @@

// New commands
DbCommand.NcreateIsMasterCommand = function(db, databaseName) {
return new DbCommand(db, databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, {'ismaster':1}, null);
};
// Provide constructors for different db commands

@@ -185,4 +196,4 @@ DbCommand.createIsMasterCommand = function(db) {

DbCommand.createDbCommand = function(db, command_hash) {
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null);
DbCommand.createDbCommand = function(db, command_hash, options) {
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null, options);
};

@@ -189,0 +200,0 @@

@@ -11,2 +11,13 @@ var BaseCommand = require('./base_command').BaseCommand,

BaseCommand.call(this);
// Validate correctness off the selector
var object = selector;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("delete raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}

@@ -79,5 +90,16 @@ this.collectionName = collectionName;

_command[_index++] = 0;
// Document binary length
var documentLength = 0
// Serialize the selector
// If we are passing a raw buffer, do minimal validation
if(this.selector instanceof Buffer) {
documentLength = this.selector.length;
// Copy the data into the current buffer
this.selector.copy(_command, _index);
} else {
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.selector, this.checkKeys, _command, _index) - _index + 1;
}
// Serialize the selector
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.selector, this.checkKeys, _command, _index) - _index + 1;
// Write the length to the document

@@ -84,0 +106,0 @@ _command[_index + 3] = (documentLength >> 24) & 0xff;

@@ -24,4 +24,2 @@ var BaseCommand = require('./base_command').BaseCommand,

GetMoreCommand.prototype.toBinary = function() {
// debug("======================================================= GETMORE")
// debug("================ " + this.db.bson_serializer.BSON.calculateObjectSize(this.query))
// Calculate total length of the document

@@ -28,0 +26,0 @@ var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 8 + (4 * 4);

@@ -17,2 +17,3 @@ var BaseCommand = require('./base_command').BaseCommand,

this.flags = 0;
this.serializeFunctions = false;

@@ -27,2 +28,7 @@ // Ensure valid options hash

}
// Let us defined on a command basis if we want functions to be serialized or not
if(options['serializeFunctions'] != null && options['serializeFunctions']) {
this.serializeFunctions = true;
}
};

@@ -36,2 +42,11 @@

InsertCommand.prototype.add = function(document) {
if(document instanceof Buffer) {
var object_size = document[0] | document[1] << 8 | document[2] << 16 | document[3] << 24;
if(object_size != document.length) {
var error = new Error("insert raw message size does not match message header size [" + document.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
this.documents.push(document);

@@ -54,4 +69,8 @@ return this;

for(var i = 0; i < this.documents.length; i++) {
// Calculate size of document
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.documents[i]);
if(this.documents[i] instanceof Buffer) {
totalLengthOfCommand += this.documents[i].length;
} else {
// Calculate size of document
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.documents[i], this.serializeFunctions);
}
}

@@ -98,7 +117,20 @@

_command[_index - 1] = 0;
// Write all the bson documents to the buffer at the index offset
for(var i = 0; i < this.documents.length; i++) {
// Serialize the document straight to the buffer
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.documents[i], this.checkKeys, _command, _index) - _index + 1;
// Document binary length
var documentLength = 0
var object = this.documents[i];
// Serialize the selector
// If we are passing a raw buffer, do minimal validation
if(object instanceof Buffer) {
documentLength = object.length;
// Copy the data into the current buffer
object.copy(_command, _index);
} else {
// Serialize the document straight to the buffer
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1;
}
// Write the length to the document

@@ -105,0 +137,0 @@ _command[_index + 3] = (documentLength >> 24) & 0xff;

@@ -10,5 +10,29 @@ var BaseCommand = require('./base_command').BaseCommand,

**/
var QueryCommand = exports.QueryCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector) {
var QueryCommand = exports.QueryCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector, options) {
BaseCommand.call(this);
// Validate correctness off the selector
var object = query;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
var object = returnFieldSelector;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
// Make sure we don't get a null exception
options = options == null ? {} : options;
// Set up options
this.collectionName = collectionName;

@@ -21,2 +45,7 @@ this.queryOptions = queryOptions;

this.db = db;
// Let us defined on a command basis if we want functions to be serialized or not
if(options['serializeFunctions'] != null && options['serializeFunctions']) {
this.serializeFunctions = true;
}
};

@@ -40,12 +69,17 @@

QueryCommand.prototype.toBinary = function() {
// debug("======================================================= QUERY")
// debug("================ " + this.db.bson_serializer.BSON.calculateObjectSize(this.query))
var totalLengthOfCommand = 0;
// Calculate total length of the document
if(this.query instanceof Buffer) {
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.query.length + (4 * 4);
} else {
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.query, this.serializeFunctions) + (4 * 4);
}
// Calculate total length of the document
var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.query) + (4 * 4);
// Calculate extra fields size
if(this.returnFieldSelector != null) {
if(this.returnFieldSelector != null && !(this.returnFieldSelector instanceof Buffer)) {
if(Object.keys(this.returnFieldSelector).length > 0) {
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.returnFieldSelector);
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.returnFieldSelector, this.serializeFunctions);
}
} else if(this.returnFieldSelector instanceof Buffer) {
totalLengthOfCommand += this.returnFieldSelector.length;
}

@@ -110,7 +144,17 @@

_index = _index + 4;
// Document binary length
var documentLength = 0
var object = this.query;
// Serialize the selector
if(object instanceof Buffer) {
documentLength = object.length;
// Copy the data into the current buffer
object.copy(_command, _index);
} else {
// Serialize the document straight to the buffer
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1;
}
// Serialize the query document straight to the buffer
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.query, this.checkKeys, _command, _index) - _index + 1;
// debug(inspect("===================== documentLength :: " + documentLength))
// Write the length to the document

@@ -127,5 +171,5 @@ _command[_index + 3] = (documentLength >> 24) & 0xff;

// Push field selector if available
if(this.returnFieldSelector != null) {
if(this.returnFieldSelector != null && !(this.returnFieldSelector instanceof Buffer)) {
if(Object.keys(this.returnFieldSelector).length > 0) {
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.returnFieldSelector, this.checkKeys, _command, _index) - _index + 1;
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.returnFieldSelector, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1;
// Write the length to the document

@@ -141,7 +185,24 @@ _command[_index + 3] = (documentLength >> 24) & 0xff;

}
} if(this.returnFieldSelector != null && this.returnFieldSelector instanceof Buffer) {
// Document binary length
var documentLength = 0
var object = this.returnFieldSelector;
// Serialize the selector
documentLength = object.length;
// Copy the data into the current buffer
object.copy(_command, _index);
// Write the length to the document
_command[_index + 3] = (documentLength >> 24) & 0xff;
_command[_index + 2] = (documentLength >> 16) & 0xff;
_command[_index + 1] = (documentLength >> 8) & 0xff;
_command[_index] = documentLength & 0xff;
// Update index in buffer
_index = _index + documentLength;
// Add terminating 0 for the object
_command[_index - 1] = 0;
}
// debug("------------------------------------------------------------------------")
// debug(inspect(_command))
// Return finished command
return _command;

@@ -148,0 +209,0 @@ };

@@ -12,2 +12,22 @@ var BaseCommand = require('./base_command').BaseCommand,

var object = spec;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("update spec raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
var object = document;
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) {
var error = new Error("update document raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
error.name = 'MongoError';
throw error;
}
}
this.collectionName = collectionName;

@@ -17,2 +37,3 @@ this.spec = spec;

this.db = db;
this.serializeFunctions = false;

@@ -27,2 +48,6 @@ // Generate correct flags

this.flags = parseInt(db_multi_update.toString() + db_upsert.toString(), 2);
// Let us defined on a command basis if we want functions to be serialized or not
if(options['serializeFunctions'] != null && options['serializeFunctions']) {
this.serializeFunctions = true;
}
};

@@ -46,4 +71,4 @@

// Calculate total length of the document
var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.spec) +
this.db.bson_serializer.BSON.calculateObjectSize(this.document) + (4 * 4);
var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.spec, false) +
this.db.bson_serializer.BSON.calculateObjectSize(this.document, this.serializeFunctions) + (4 * 4);

@@ -98,4 +123,18 @@ // Let's build the single pass buffer command

// Serialize the spec document
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.spec, this.checkKeys, _command, _index) - _index + 1;
// Document binary length
var documentLength = 0
var object = this.spec;
// Serialize the selector
// If we are passing a raw buffer, do minimal validation
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) throw new Error("raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
documentLength = object.length;
// Copy the data into the current buffer
object.copy(_command, _index);
} else {
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, false) - _index + 1;
}
// Write the length to the document

@@ -111,4 +150,18 @@ _command[_index + 3] = (documentLength >> 24) & 0xff;

// Document binary length
var documentLength = 0
var object = this.document;
// Serialize the document
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.document, this.checkKeys, _command, _index) - _index + 1;
// If we are passing a raw buffer, do minimal validation
if(object instanceof Buffer) {
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24;
if(object_size != object.length) throw new Error("raw message size does not match message header size [" + object.length + "] != [" + object_size + "]");
documentLength = object.length;
// Copy the data into the current buffer
object.copy(_command, _index);
} else {
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1;
}
// Write the length to the document

@@ -115,0 +168,0 @@ _command[_index + 3] = (documentLength >> 24) & 0xff;

var utils = require('./connection_utils'),
inherits = require('util').inherits,
net = require('net'),
EventEmitter = require("events").EventEmitter,
EventEmitter = require('events').EventEmitter,
inherits = require('util').inherits,
MongoReply = require("../responses/mongo_reply").MongoReply,
Connection = require("./connection").Connection;
var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, socketOptions) {
if(typeof host !== 'string' || typeof port !== 'number') throw "host and port must be specified";
var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bson, socketOptions) {
if(typeof host !== 'string' || typeof port !== 'number') throw "host and port must be specified [" + host + ":" + port + "]";
// Set up event emitter
EventEmitter.call(this);
// Keep all options for the socket in a specific collection allowing the user to specify the

@@ -15,2 +19,3 @@ // Wished upon socket connection parameters

this.socketOptions.poolSize = poolSize;
this.bson = bson;

@@ -39,29 +44,72 @@ // Set host variable or default

this.openConnections = {};
// Assign connection id's
this.connectionId = 0;
// Just keeps list of events we allow
// this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[]};
// Current connection to pick
this.currentConnectionIndex = 0;
// The pool state
this._poolState = 'not connected';
}
// Inherit event emitter so we can emit stuff wohoo
inherits(ConnectionPool, EventEmitter);
ConnectionPool.prototype.setMaxBsonSize = function(maxBsonSize) {
var keys = Object.keys(this.openConnections);
for(var i = 0; i < keys.length; i++) {
this.openConnections[keys[i]].maxBsonSize = maxBsonSize;
}
}
// Creates handlers
var connectHandler = function(self) {
return function(err) {
// this references the connection object
if(err) {
return function(err, connection) {
// Ensure we don't fire same error message multiple times
var fireError = true;
var performedOperation = false;
// if we have an error and we have not already put the connection into the list of connections with errors
if(err && Object.keys(self.waitingToOpen).length > 0 && self.openConnections[connection.id] == null && self.connectionsWithErrors[connection.id] == null) {
// Add to list of error connections
self.connectionsWithErrors[this.id] = this;
self.connectionsWithErrors[connection.id] = connection;
// Remove from list of waiting to connect
delete self.waitingToOpen[this.id];
// Emit error so rest of code knows
self.emit("error", err, this);
} else {
delete self.waitingToOpen[connection.id];
// Ensure we only fire an error if there was an operation to avoid duplicate errors
performedOperation = true;
} else if(err && self.openConnections[connection.id] != null){
// Add to list of error connections
self.connectionsWithErrors[connection.id] = connection;
// Remove from list of open connections
delete self.openConnections[connection.id];
// Ensure we only fire an error if there was an operation to avoid duplicate errors
performedOperation = true;
} else if(!err && self.waitingToOpen[connection.id] != null){
// Remove from list of waiting to connect
delete self.waitingToOpen[this.id];
delete self.waitingToOpen[connection.id];
// Add to list of open connections
self.openConnections[this.id] = this;
}
self.openConnections[connection.id] = connection;
// Ensure we only fire an error if there was an operation to avoid duplicate errors
performedOperation = true;
} else {
fireError = false;
}
// Check if we are done meaning that the number of openconnections + errorconnections
if(Object.keys(self.waitingToOpen).length == 0) {
// Emit pool is ready
self.emit("poolReady");
if(Object.keys(self.waitingToOpen).length == 0 && performedOperation) {
// If we have any errors notify the application, only fire if we don't have the element already in
// errors
if(Object.keys(self.connectionsWithErrors).length > 0 && fireError) {
// Set pool type to not connected
self._poolState = 'not connected';
// Emit error
self.emit("error", err, connection);
} else {
// Set pool state to connecting
self._poolState = 'connected';
// Emit pool is ready
self.emit("poolReady");
}
}

@@ -72,3 +120,7 @@ }

// Start method, will throw error if no listeners are available
// Pass in an instance of the listener that contains the api for
// finding callbacks for a given message etc.
ConnectionPool.prototype.start = function() {
var self = this;
if(this.listeners("poolReady").length == 0) {

@@ -78,10 +130,29 @@ throw "pool must have at least one listener ready that responds to the [poolReady] event";

// Set pool state to connecting
this._poolState = 'connecting';
// Let's boot up all the instances
for(var i = 0; i < this.socketOptions.poolSize; i++) {
for(var i = 0; i < this.socketOptions.poolSize; i++) {
// Create a new connection instance
var connection = new Connection(i, this.socketOptions);
var connection = new Connection(this.connectionId++, this.socketOptions);
// Add connection to list of waiting connections
this.waitingToOpen[connection.id] = connection;
// Add a connection handler
this.waitingToOpen[connection.id] = connection;
connection.on("connect", connectHandler(this));
connection.on("error", connectHandler(this));
connection.on("close", connectHandler(this));
connection.on("end", connectHandler(this));
connection.on("timeout", connectHandler(this));
connection.on("parseError", function(err) {
// Set pool type to not connected
self._poolState = 'not connected';
// Only close the connection if it's still connected
if(self.isConnected()) self.stop();
// Emit the error
self.emit("parseError", err);
});
connection.on("message", function(message) {
self.emit("message", message);
});
// Start connection

@@ -92,11 +163,71 @@ connection.start();

// Restart a connection pool (on a close the pool might be in a wrong state)
ConnectionPool.prototype.restart = function() {
// Close all connections
this.stop();
// Now restart the pool
this.start();
}
// Stop the connections in the pool
ConnectionPool.prototype.stop = function() {
// Set not connected
this._poolState = 'not connected';
// Get all open connections
var keys = Object.keys(this.openConnections);
// Force close all open sockets
for(var i = 0; i < keys.length; i++) {
this.openConnections[keys[i]].close();
}
// Get all error connections
var keys = Object.keys(this.connectionsWithErrors);
// Force close all error sockets
for(var i = 0; i < keys.length; i++) {
this.connectionsWithErrors[keys[i]].close();
}
// Get all waiting to open connections
var keys = Object.keys(this.waitingToOpen);
// Force close all waiting sockets
for(var i = 0; i < keys.length; i++) {
this.waitingToOpen[keys[i]].close();
}
// Clear out all the connection variables
this.waitingToOpen = {};
this.connectionsWithErrors = {};
this.openConnections = {};
// Emit a close event so people can track the event
this.emit("close");
}
// Check the status of the connection
ConnectionPool.prototype.isConnected = function() {
return Object.keys(this.waitingToOpen).length == 0
&& Object.keys(this.connectionsWithErrors).length == 0
&& Object.keys(this.openConnections).length > 0 && this._poolState === 'connected'
&& this.openConnections[Object.keys(this.openConnections)[0]].isConnected();
}
// Checkout a connection from the pool for usage, or grab a specific pool instance
ConnectionPool.prototype.checkoutConnection = function(id) {
// If we have an id return that specific connection
if(id != null) return this.openConnections[id];
// Otherwise let's pick one using roundrobin
var keys = Object.keys(this.openConnections);
return this.openConnections[(keys[(this.currentConnectionIndex++ % keys.length)])]
}
ConnectionPool.prototype.getAllConnections = function() {
return this.openConnections;
}
// Remove all non-needed event listeners
ConnectionPool.prototype.removeAllEventListeners = function() {
this.removeAllListeners("close");
this.removeAllListeners("error");
}

@@ -117,1 +248,9 @@

var utils = require('./connection_utils'),
inherits = require('util').inherits,
net = require('net'),
binary_utils = require('../bson/binary_utils'),
EventEmitter = require("events").EventEmitter;
debug = require('util').debug,
inspect = require('util').inspect,
EventEmitter = require('events').EventEmitter,
inherits = require('util').inherits,
binaryutils = require('../bson/binary_utils');
// Set max bson size
const DEFAULT_MAX_BSON_SIZE = 4 * 1024 * 1024 * 4 * 3;
var Connection = exports.Connection = function(id, socketOptions) {
// Store all socket options
this.socketOptions = socketOptions;
this.socketOptions = socketOptions ? socketOptions : {host:'localhost', port:27017};
// Id for the connection

@@ -19,2 +25,3 @@ this.id = id;

this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : DEFAULT_MAX_BSON_SIZE;
// Contains the current message bytes

@@ -28,2 +35,5 @@ this.buffer = null;

this.stubBuffer = 0;
// Just keeps list of events we allow
resetHandlers(this, false);
}

@@ -35,4 +45,4 @@

Connection.prototype.start = function() {
console.dir(this.socketOptions)
// Set up event emitter
EventEmitter.call(this);
// Create new connection instance

@@ -42,4 +52,4 @@ this.connection = net.createConnection(this.socketOptions.port, this.socketOptions.host);

this.connection.on("connect", connectHandler(this));
this.connection.on("data", dataHandler(this));
this.connection.on("end", endHandler(this));
this.connection.on("data", createDataHandler(this));
// this.connection.on("end", endHandler(this));
this.connection.on("timeout", timeoutHandler(this));

@@ -51,2 +61,45 @@ this.connection.on("drain", drainHandler(this));

// Check if the sockets are live
Connection.prototype.isConnected = function() {
return this.connected;
}
// Write the data out to the socket
Connection.prototype.write = function(command, callback) {
try {
// If we have a list off commands to be executed on the same socket
if(Array.isArray(command)) {
for(var i = 0; i < command.length; i++) {
var t = this.connection.write(command[i].toBinary());
}
} else {
var r = this.connection.write(command.toBinary());
}
} catch (err) {
if(typeof callback === 'function') callback(err);
}
}
// Force the closure of the connection
Connection.prototype.close = function() {
// clear out all the listeners
resetHandlers(this, true);
// destroy connection
this.connection.destroy();
}
// Reset all handlers
var resetHandlers = function(self, clearListeners) {
self.eventHandlers = {error:[], connect:[], close:[], end:[], timeout:[], parseError:[], message:[]};
// If we want to clear all the listeners
if(clearListeners) {
var keys = Object.keys(self.eventHandlers);
// Remove all listeners
for(var i = 0; i < keys.length; i++) {
self.connection.removeAllListeners(keys[i]);
}
}
}
//

@@ -60,3 +113,3 @@ // Handlers

// Set options on the socket
this.setEncoding(self.socketOptions.encoding);
// this.setEncoding(self.socketOptions.encoding);
this.setTimeout(self.socketOptions.timeout);

@@ -72,11 +125,10 @@ this.setNoDelay(self.socketOptions.noDelay);

// Emit the connect event with no error
self.emit("connect", null);
self.emit("connect", null, self);
}
}
var dataHandler = function(self) {
var createDataHandler = exports.Connection.createDataHandler = function(self) {
// We need to handle the parsing of the data
// and emit the messages when there is a complete one
return function(data) {
// Parse until we are done with the data

@@ -88,17 +140,16 @@ while(data.length > 0) {

var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
// Check if the current chunk contains the rest of the message
if(remainingBytesToRead > data.length) {
// Copy the new data into the exiting buffer (should have been allocated when we know the message size)
data.copy(self.buffer, self.bytesRead);
// Adjust the number of bytes read so it point to the correct index in the buffer
self.bytesRead = self.bytesRead + remainingBytesToRead;
self.bytesRead = self.bytesRead + data.length;
// Reset state of buffer
data = new Buffer(0);
} else {
// Copy the missing part of the data into our current buffer
data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
// Slice the overflow into a new buffer that we will then re-parse
data = data.slice(self.sizeOfMessage);
data = data.slice(remainingBytesToRead);

@@ -108,9 +159,16 @@ // Emit current complete message

self.emit("message", self.buffer);
} finally {
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
} catch(err) {
// We got a parse Error fire it off then keep going
self.emit("parseError", {err:"socketHandler", trace:err, bin:buffer, parseState:{
sizeOfMessage:self.sizeOfMessage,
bytesRead:self.bytesRead,
stubBuffer:self.stubBuffer}});
}
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
}

@@ -124,20 +182,15 @@ } else {

if(self.stubBuffer.length + data.length > 4) {
// Prepad the data
var newData = new Buffer(self.stubBuffer.length + data.length);
self.stubBuffer.copy(newData, 0);
data.copy(newData, self.stubBuffer.length);
// Reassign for parsing
data = newData;
// Create temp buffer to keep the 4 bytes for the size
var messageSizeBuffer = new Buffer(4);
// Copy in the stubBuffer data
self.stubBuffer.copy(messageSizeBuffer, 0);
// Copy the remaining (4 - stubBuffer.length) bytes needed to determine the size
data.copy(messageSizeBuffer, self.stubBuffer.length, 0, (4 - self.stubBuffer.length))
// Determine the message Size
self.sizeOfMessage = binary_utils.decodeUInt32(messageSizeBuffer, 0)
// Do a single allocation for the buffer
self.buffer = new Buffer(self.sizeOfMessage);
// Set bytes read
self.bytesRead = 4;
// Slice the data so we can keep parsing
data = data.slice((4 - self.stubBuffer.length));
// Null out stub buffer
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
} else {

@@ -155,11 +208,9 @@

} else {
if(data.length > 4) {
// Retrieve the message size
var sizeOfMessage = binaryutils.decodeUInt32(data, 0);
// Write the data into the buffer
if(sizeOfMessage > data.length) {
self.buffer = new Buffer(sizeOfMessage);
// Ensure that the size of message is larger than 0 and less than the max allowed
if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage > data.length) {
self.buffer = new Buffer(sizeOfMessage);
// Copy all the data into the buffer

@@ -176,6 +227,5 @@ data.copy(self.buffer, 0);

} else if(sizeOfMessage == data.length) {
} else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage == data.length) {
try {
emit("message", data);
} finally {
self.emit("message", data);
// Reset state of buffer

@@ -186,8 +236,41 @@ self.buffer = null;

self.stubBuffer = null;
}
}
// Exit parsing loop
data = new Buffer(0);
} catch (err) {
// We got a parse Error fire it off then keep going
self.emit("parseError", {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
sizeOfMessage:self.sizeOfMessage,
bytesRead:self.bytesRead,
stubBuffer:self.stubBuffer}});
}
} else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonSize) {
// We got a parse Error fire it off then keep going
self.emit("parseError", {err:"socketHandler", trace:null, bin:data, parseState:{
sizeOfMessage:sizeOfMessage,
bytesRead:0,
buffer:null,
stubBuffer:null}});
// Clear out the state of the parser
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Exit parsing loop
data = new Buffer(0);
} else {
self.emit("message", data.slice(0, sizeOfMessage));
// Reset state of buffer
self.buffer = null;
self.sizeOfMessage = 0;
self.bytesRead = 0;
self.stubBuffer = null;
// Copy rest of message
data = data.slice(sizeOfMessage);
}
} else {
// Create a buffer that contains the space for the non-complete message

@@ -203,5 +286,2 @@ self.stubBuffer = new Buffer(data.length)

}
console.log("========================= data");
}

@@ -212,3 +292,6 @@ }

return function() {
console.log("========================= end");
// Set connected to false
self.connected = false;
// Emit end event
self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
}

@@ -219,3 +302,3 @@ }

return function() {
console.log("========================= timeout");
self.emit("end", {err: 'connection to [' + self.socketOptions.host + ':' + self.socketOptions.port + '] timed out'}, self);
}

@@ -226,3 +309,2 @@ }

return function() {
console.log("========================= drain");
}

@@ -233,3 +315,6 @@ }

return function(err) {
console.log("========================= error");
// Set connected to false
self.connected = false;
// Emit error
self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
}

@@ -240,13 +325,15 @@ }

return function(hadError) {
// Set connected to false
self.connected = false;
// If we have an error during the connection phase
if(hadError && !self.connected) {
self.emit("connect", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
if(hadError && !self.connected) {
self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
} else {
self.emit("close", {err: 'connection closed to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
}
console.log("========================= close");
}
}
// Some basic defaults
Connection.DEFAULT_PORT = 27017;

@@ -253,0 +340,0 @@

@@ -43,3 +43,3 @@ var QueryCommand = require('./commands/query_command').QueryCommand,

*/
var Cursor = exports.Cursor = function(db, collection, selector, fields, skip, limit, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk) {
var Cursor = exports.Cursor = function(db, collection, selector, fields, skip, limit, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk, raw) {
this.db = db;

@@ -59,2 +59,3 @@ this.collection = collection;

this.slaveOk = slaveOk == null ? collection.slaveOk : slaveOk;
this.raw = raw == null ? false : raw;

@@ -126,3 +127,3 @@ this.totalNumberOfRecords = 0;

this.each(function(err, item) {
this.each(function(err, item) {
if(err != null) return callback(err, null);

@@ -135,2 +136,3 @@

items = null;
self.items = [];
}

@@ -394,3 +396,2 @@ });

if(this.snapshot != null) specialSelector['$snapshot'] = true;
return new QueryCommand(this.db, this.collectionName, queryOptions, this.skipValue, numberToReturn, specialSelector, this.fields);

@@ -486,3 +487,3 @@ } else {

self.db.executeCommand(cmd, {read:true}, commandHandler);
self.db._executeQueryCommand(cmd, {read:true, raw:self.raw}, commandHandler);
commandHandler = null;

@@ -521,3 +522,3 @@ } else if(self.items.length) {

// Execute the command
self.db.executeCommand(getMoreCommand, {read:true}, function(err, result) {
self.db._executeQueryCommand(getMoreCommand, {read:true, raw:self.raw}, function(err, result) {
try {

@@ -605,3 +606,3 @@ if(err != null) callback(err, null);

function execute(command) {
self.db.executeCommand(command, {read:true}, function(err,result) {
self.db._executeQueryCommand(command, {read:true, raw:self.raw}, function(err,result) {
if(err) {

@@ -666,14 +667,14 @@ stream.emit('error', err);

var command = new KillCursorCommand(this.db, [this.cursorId]);
this.db.executeCommand(command, {read:true}, null);
this.db._executeQueryCommand(command, {read:true, raw:self.raw}, null);
} catch(err) {}
}
// Reset cursor id
this.cursorId = self.db.bson_serializer.Long.fromInt(0);
// Set to closed status
this.state = Cursor.CLOSED;
if(callback) {
process.nextTick(function(){
callback(null, self);
self.items = null;
});
callback(null, self);
self.items = [];
}

@@ -680,0 +681,0 @@

@@ -6,6 +6,5 @@ var QueryCommand = require('./commands/query_command').QueryCommand,

Admin = require('./admin').Admin,
Connection = require('./connection').Connection,
Collection = require('./collection').Collection,
Server = require('./connections/server').Server,
ReplSetServers = require('./connections/repl_set_servers').ReplSetServers,
Server = require('./connection/server').Server,
ReplSetServers = require('./connection/repl_set_servers').ReplSetServers,
Cursor = require('./cursor').Cursor,

@@ -48,2 +47,3 @@ EventEmitter = require('events').EventEmitter,

this.auths = [];
this.logger = this.options.logger != null

@@ -61,25 +61,76 @@ && (typeof this.options.logger.debug == 'function')

// Add a listener for the reconnect event
this.on("reconnect", function() {
// Number of current auths
var authLength = self.auths.length;
var numberOfReadyAuth = 0;
this.tag = new Date().getTime();
// Contains the callbacks
this._mongodbHandlers = {_mongodbCallbacks : {}, _notReplied : {}};
// Just keeps list of events we allow
this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[]};
if(authLength > 0) {
// If we have any auths fire off the auth message to all the connections
for(var i = 0; i < authLength; i++) {
// Execute auth commands
self.authenticate(self.auths[i].username, self.auths[i].password, function(err, result) {
numberOfReadyAuth = numberOfReadyAuth + 1;
// Controls serialization options
this.serializeFunctions = this.options.serializeFunctions != null ? this.options.serializeFunctions : false;
// Raw mode
this.raw = this.options.raw != null ? this.options.raw : false;
// Retry information
this.retryMiliSeconds = this.options.retryMiliSeconds != null ? this.options.retryMiliSeconds : 5000;
this.numberOfRetries = this.options.numberOfRetries != null ? this.options.numberOfRetries : 5;
// Reaper information
this.reaperInterval = this.options.reaperInterval != null ? this.options.reaperInterval : 1000;
this.reaperTimeout = this.options.reaperTimeout != null ? this.options.reaperTimeout : 30000;
// Start reaper, cleans up timed out calls
this.reaperIntervalId = setInterval(reaper(this, this.reaperTimeout), this.reaperInterval);
};
if(numberOfReadyAuth == self.auths.length) {
self.serverConfig.emit("resend");
}
});
}
} else {
self.serverConfig.emit("resend");
// Does forced cleanup of callbacks if a call never returns
var reaper = function(dbInstance, timeout) {
return function() {
// Only trigger reaper if it's still valid and we have a proper connection pool
if(dbInstance.reaperIntervalId != null) {
// Current time
var currentTime = new Date().getTime();
// If it's no longer connected clear out the interval
if(dbInstance.serverConfig.connectionPool != null && !dbInstance.serverConfig.isConnected() && dbInstance.reaperIntervalId != null) {
// Clear the interval
clearInterval(dbInstance.reaperIntervalId);
// this._mongodbHandlers = {_mongodbCallbacks : {}, _notReplied : {}};
// Trigger all remaining callbacks with timeout error
if(dbInstance._mongodbHandlers != null && dbInstance._mongodbHandlers._notReplied != null) {
var keys = Object.keys(dbInstance._mongodbHandlers._notReplied);
// Iterate over all callbacks
for(var i = 0; i < keys.length; i++) {
// Get callback
var callback = dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]];
// Cleanup
delete dbInstance._mongodbHandlers._notReplied[keys[i]];
delete dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]];
// Perform callback
callback(new Error("operation timed out"), null);
}
}
} else {
// Trigger all callbacks that went over timeout period
if(dbInstance._mongodbHandlers != null && dbInstance._mongodbHandlers._notReplied != null) {
var keys = Object.keys(dbInstance._mongodbHandlers._notReplied);
// Iterate over all callbacks
for(var i = 0; i < keys.length; i++) {
// Get info element
var info = dbInstance._mongodbHandlers._notReplied[keys[i]];
// If it's timed out let's remove the callback and return an error
if((currentTime - info.start) > timeout) {
// Get callback
var callback = dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]];
// Cleanup
delete dbInstance._mongodbHandlers._notReplied[keys[i]];
delete dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]];
// Perform callback
callback(new Error("operation timed out"), null);
}
}
}
}
}
});
};
}
}

@@ -102,8 +153,12 @@ function validateDatabaseName(databaseName) {

if(self.serverConfig instanceof Server || self.serverConfig instanceof ReplSetServers) {
self.serverConfig.connect(self, function(err, result) {
if(err != null) return callback(err, null);
self.serverConfig.connect(self, {firstCall: true}, function(err, result) {
if(err != null) {
// Clear reaper interval
if(self.reaperIntervalId != null) clearInterval(self.reaperIntervalId);
// Return error from connection
return callback(err, null);
}
// Callback
return callback(null, self);
});
} else {

@@ -114,7 +169,27 @@ return callback(Error("Server parameter must be of type Server or ReplSetServers"), null);

Db.prototype.close = function(callback) {
Db.prototype.db = function(dbName) {
// Create a new db instance
var newDbInstance = new Db(dbName, this.serverConfig, this.options);
// Add the instance to the list of approved db instances
var allServerInstances = this.serverConfig.allServerInstances();
// Add ourselves to all server callback instances
for(var i = 0; i < allServerInstances.length; i++) {
var server = allServerInstances[i];
server.dbInstances.push(newDbInstance);
}
// Return new db object
return newDbInstance;
}
Db.prototype.close = function(callback) {
// Clear reaperId if it's set
if(this.reaperIntervalId != null) {
clearInterval(this.reaperIntervalId);
}
// Remove all listeners and close the connection
this.serverConfig.removeAllListeners("reconnect");
this.serverConfig.close(callback);
// Emit the close event
if(typeof callback !== 'function') this.emit("close");
// Remove all listeners
this.removeAllEventListeners();
// Clear out state of the connection

@@ -276,27 +351,12 @@ this.state = "notConnected";

var logoutCommand = DbCommand.logoutCommand(self, {logout:1, socket:options['socket']});
// For all the connections let's execute the command
var rawConnections = self.serverConfig.allRawConnections();
var numberOfExpectedReturns = rawConnections.length;
for(var index = 0; index < numberOfExpectedReturns; index++) {
// Execute the logout on all raw connections
self.executeCommand(logoutCommand, {writer: rawConnections[index].connection}, function(err, result) {
// Ajust the number of expected results
numberOfExpectedReturns = numberOfExpectedReturns - 1;
// If we are done let's evaluate
if(numberOfExpectedReturns <= 0) {
// Reset auth
self.auths = [];
// Handle any errors
if(err == null && result.documents[0].ok == 1) {
callback(null, true);
} else {
err != null ? callback(err, false) : callback(new Error(result.documents[0].errmsg), false);
}
}
});
}
self._executeQueryCommand(logoutCommand, {onAll:true}, function(err, result) {
// Reset auth
self.auths = [];
// Handle any errors
if(err == null && result.documents[0].ok == 1) {
callback(null, true);
} else {
err != null ? callback(err, false) : callback(new Error(result.documents[0].errmsg), false);
}
});
}

@@ -309,45 +369,40 @@

var self = this;
// Add the auth details to the connection for reconnects or update existing if any
var found = false;
for(var i = 0; i < self.auths.length; i++) {
// If we have found an existing auth, update the password
if(self.auths[i].username == username) {
found = true;
self.auths[i].password = password;
}
}
// Push the new auth if we have no previous record
if(!found) self.auths.push({'username':username, 'password':password});
// Figure out the number of times we need to trigger
var rawConnections = self.serverConfig.allRawConnections();
var numberOfExpectedReturns = rawConnections.length;
// Execute the commands
for(var i = 0; i < numberOfExpectedReturns; i++) {
// Execute command
var createNonceCallback = function(index) {
return function(err, reply) {
if(err == null) {
// Nonce used to make authentication request with md5 hash
var nonce = reply.documents[0].nonce;
// Execute command
self.executeCommand(DbCommand.createAuthenticationCommand(self, username, password, nonce), {writer: rawConnections[index].connection}, function(err, result) {
// Ajust the number of expected results
numberOfExpectedReturns = numberOfExpectedReturns - 1;
// If we are done let's evaluate
if(numberOfExpectedReturns <= 0) {
if(err == null && result.documents[0].ok == 1) {
callback(null, true);
} else {
err != null ? callback(err, false) : callback(new Error(result.documents[0].errmsg), false);
}
}
});
} else {
callback(err, null);
self.auths = [{'username':username, 'password':password}];
// Get the amount of connections in the pool to ensure we have authenticated all comments
var numberOfConnections = Object.keys(this.serverConfig.allRawConnections()).length;
var errorObject = null;
// Execute all four
this._executeQueryCommand(DbCommand.createGetNonceCommand(self), {onAll:true}, function(err, result, connection) {
// Execute on all the connections
if(err == null) {
// Nonce used to make authentication request with md5 hash
var nonce = result.documents[0].nonce;
// Execute command
self._executeQueryCommand(DbCommand.createAuthenticationCommand(self, username, password, nonce), {connection:connection}, function(err, result) {
// Ensure we save any error
if(err) {
errorObject = err;
} else if(result.documents[0].err != null || result.documents[0].errmsg != null){
errorObject = self.wrap(result.documents[0]);
}
}
}
this.executeCommand(DbCommand.createGetNonceCommand(self), {writer: rawConnections[i].connection}, createNonceCallback(i));
}
// Count down
numberOfConnections = numberOfConnections - 1;
// If we are done with the callbacks return
if(numberOfConnections <= 0) {
if(errorObject == null && result.documents[0].ok == 1) {
callback(errorObject, true);
} else {
callback(errorObject, false);
}
}
});
} else {
}
});
};

@@ -413,3 +468,3 @@

try {
var collection = new Collection(self, collectionName, self.pkFactory);
var collection = new Collection(self, collectionName, self.pkFactory, options);
} catch(err) {

@@ -420,8 +475,8 @@ return callback(err, null);

}
// Create a new collection and return it
self.executeCommand(DbCommand.createCreateCollectionCommand(self, collectionName, options), {read:false, safe:true}, function(err, result) {
self._executeQueryCommand(DbCommand.createCreateCollectionCommand(self, collectionName, options), {read:false, safe:true}, function(err, result) {
if(err == null && result.documents[0].ok == 1) {
try {
var collection = new Collection(self, collectionName, self.pkFactory);
var collection = new Collection(self, collectionName, self.pkFactory, options);
} catch(err) {

@@ -447,3 +502,3 @@ return callback(err, null);

Db.prototype.dropCollection = function(collectionName, callback) {
this.executeCommand(DbCommand.createDropCollectionCommand(this, collectionName), function(err, result) {
this._executeQueryCommand(DbCommand.createDropCollectionCommand(this, collectionName), function(err, result) {
if(err == null && result.documents[0].ok == 1) {

@@ -461,3 +516,3 @@ if(callback != null) return callback(null, true);

Db.prototype.renameCollection = function(fromCollection, toCollection, callback) {
this.executeCommand(DbCommand.createRenameCollectionCommand(this, fromCollection, toCollection), function(err, doc) { callback(err, doc); });
this._executeQueryCommand(DbCommand.createRenameCollectionCommand(this, fromCollection, toCollection), function(err, doc) { callback(err, doc); });
};

@@ -475,3 +530,3 @@

this.executeCommand(DbCommand.createGetLastErrorCommand(options, this), connectionOptions, function(err, error) {
this._executeQueryCommand(DbCommand.createGetLastErrorCommand(options, this), connectionOptions, function(err, error) {
callback(err, error && error.documents);

@@ -494,3 +549,3 @@ });

Db.prototype.lastStatus = function(callback) {
this.executeCommand(DbCommand.createGetLastStatusCommand(this), callback);
this._executeQueryCommand(DbCommand.createGetLastStatusCommand(this), callback);
};

@@ -502,3 +557,3 @@

Db.prototype.previousErrors = function(callback) {
this.executeCommand(DbCommand.createGetPreviousErrorsCommand(this), function(err, error) {
this._executeQueryCommand(DbCommand.createGetPreviousErrorsCommand(this), function(err, error) {
callback(err, error.documents);

@@ -513,3 +568,3 @@ });

if(callback == null) { callback = options; options = {}; }
this.executeCommand(DbCommand.createDbCommand(this, command_hash), options, callback);
this._executeQueryCommand(DbCommand.createDbCommand(this, command_hash, options), options, callback);
};

@@ -521,3 +576,3 @@

Db.prototype.executeDbAdminCommand = function(command_hash, callback) {
this.executeCommand(DbCommand.createAdminDbCommand(this, command_hash), callback);
this._executeQueryCommand(DbCommand.createAdminDbCommand(this, command_hash), callback);
};

@@ -529,3 +584,3 @@

Db.prototype.resetErrorHistory = function(callback) {
this.executeCommand(DbCommand.createResetErrorHistoryCommand(this), callback);
this._executeQueryCommand(DbCommand.createResetErrorHistoryCommand(this), callback);
};

@@ -539,3 +594,3 @@

var command = DbCommand.createCreateIndexCommand(this, collectionName, fieldOrSpec, options);
this.executeCommand(command, {read:false, safe:true}, function(err, result) {
this._executeInsertCommand(command, {read:false, safe:true}, function(err, result) {
if(err != null) return callback(err, null);

@@ -565,3 +620,3 @@

if(!collectionInfo[index_name]) {
self.executeCommand(command, {read:false, safe:true}, function(err, result) {
self._executeInsertCommand(command, {read:false, safe:true}, function(err, result) {
if(err != null) return callback(err, null);

@@ -586,3 +641,3 @@

Db.prototype.cursorInfo = function(callback) {
this.executeCommand(DbCommand.createDbCommand(this, {'cursorInfo':1}), function(err, result) {
this._executeQueryCommand(DbCommand.createDbCommand(this, {'cursorInfo':1}), function(err, result) {
callback(err, result.documents[0]);

@@ -596,3 +651,3 @@ });

Db.prototype.dropIndex = function(collectionName, indexName, callback) {
this.executeCommand(DbCommand.createDropIndexCommand(this, collectionName, indexName), callback);
this._executeQueryCommand(DbCommand.createDropIndexCommand(this, collectionName, indexName), callback);
};

@@ -642,3 +697,3 @@

Db.prototype.dropDatabase = function(callback) {
this.executeCommand(DbCommand.createDropDatabaseCommand(this), function(err, result) {
this._executeQueryCommand(DbCommand.createDropDatabaseCommand(this), function(err, result) {
callback(err, result);

@@ -648,175 +703,255 @@ });

/**
Execute db command
**/
Db.prototype.executeCommand = function(db_command, options, callback) {
var self = this;
// Register a handler
Db.prototype._registerHandler = function(db_command, raw, connection, callback) {
// Add the callback to the list of handlers
this._mongodbHandlers._mongodbCallbacks[db_command.getRequestId().toString()] = callback;
// Add the information about the reply
this._mongodbHandlers._notReplied[db_command.getRequestId().toString()] = {start: new Date().getTime(), 'raw': raw, 'connection':connection};
}
// Remove a handler
Db.prototype._removeHandler = function(db_command) {
var id = typeof db_command === 'number' || typeof db_command === 'string' ? db_command.toString() : db_command.getRequestId().toString();
// Ensure we have an entry (might have been removed by the reaper)
if(this._mongodbHandlers != null && this._mongodbHandlers._mongodbCallbacks[id] != null) {
var callback = this._mongodbHandlers._mongodbCallbacks[id];
var info = this._mongodbHandlers._notReplied[id];
// Remove the handler
delete this._mongodbHandlers._mongodbCallbacks[id];
delete this._mongodbHandlers._notReplied[id]
// Return the callback
return {'callback':callback, 'info':info};
} else {
return null;
}
}
Db.prototype._findHandler = function(id) {
var callback = this._mongodbHandlers._mongodbCallbacks[id.toString()];
var info = this._mongodbHandlers._notReplied[id.toString()];
// Return the callback
return {'callback':callback, 'info':info};
}
var __executeQueryCommand = function(self, db_command, options, callback) {
// Options unpacking
var read = options['read'] != null ? options['read'] : false;
var raw = options['raw'] != null ? options['raw'] : self.raw;
var onAll = options['onAll'] != null ? options['onAll'] : false;
var specifiedConnection = options['connection'] != null ? options['connection'] : null;
// If we got a callback object
if(callback instanceof Function && !onAll) {
// Let's the out outgoing request Id
var requestId = db_command.getRequestId().toString();
// Fetch either a reader or writer dependent on the specified read option
var connection = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter();
// Override connection if needed
connection = specifiedConnection != null ? specifiedConnection : connection;
// Ensure we have a valid connection
if(connection == null) return callback(new Error("no open connections"));
// Register the handler in the data structure
self._registerHandler(db_command, raw, connection, callback);
var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
options = args.length ? args.shift() : {};
// Write the message out and handle any errors if there are any
connection.write(db_command, function(err) {
if(err != null) {
// Clean up listener and return error
var callbackInstance = self._removeHandler(db_command);
// Only call if the reaper has not removed it
if(callbackInstance != null) {
callbackInstance.callback(err);
}
}
});
} else if(callback instanceof Function && onAll) {
var connections = self.serverConfig.allRawConnections();
var keys = Object.keys(connections);
var numberOfEntries = keys.length;
// Options unpacking
var read = options['read'] != null ? options['read'] : false;
var safe = options['safe'] != null ? options['safe'] : false;
// Signals both commands return a value
var allReturn = options['allReturn'] != null ? options['allReturn'] : false;
// Let's us pass in a writer to force the use of a connection (used for admin where we need to peform 2 calls against the same connection)
var rawConnection = options['writer'] != null ? options['writer'] : null;
// Go through all the connections
for(var i = 0; i < keys.length; i++) {
// Fetch a connection
var connection = connections[keys[i]];
// Override connection if needed
connection = specifiedConnection != null ? specifiedConnection : connection;
// Ensure we have a valid connection
if(connection == null) return callback(new Error("no open connections"));
var errorCommand = null;
if(safe) {
errorCommand = DbCommand.createGetLastErrorCommand(safe, this);
}
// Contains the command we need to listen to
var listenToCommand = null;
// If we have a callback execute
if(callback instanceof Function) {
listenToCommand = errorCommand != null ? errorCommand : db_command;
// If we need to return all the results let's do it
if(allReturn && safe != null && safe != false) {
// Handle the 2 callbacks ensuring we trap all errors
var handleBothCommandsFunction = function() {
// Number of expected results before a callback
var numberOfResultsToBeReturned = 2;
var results = [];
// Handle the callbacks
return function(err, result) {
// Adjust number of expected results
numberOfResultsToBeReturned = numberOfResultsToBeReturned - 1;
// Add results and error
results.push({err:err, result:result});
// Return if we got all the responses
if(numberOfResultsToBeReturned <= 0) {
// Detect if there was an error
var errorIndex = 0;
// Find the first error if any
for(var i = 0; i < results.length; i++) {
// Check for error
if(results[i].err != null || results[i].result.documents[0].err != null) {
errorIndex = i;
break;
}
}
// If the first command fails return that and ignore the next command result
if(errorIndex > 0) {
return callback(results[errorIndex].result.documents[0], results[errorIndex].result);
} else if(results[0].err || !results[0].result.ok) {
return callback(results[0].err, results[0].result);
} else {
// First command went ok, return second command results
return callback(results[1].err, results[1].result);
}
}
}
}();
// Register the handler in the data structure
self._registerHandler(db_command, raw, connection, callback);
// Write the message out
connection.write(db_command, function(err) {
// Adjust the number of entries we need to process
numberOfEntries = numberOfEntries - 1;
// Remove listener
if(err != null) {
// Clean up listener and return error
self._removeHandler(db_command);
}
// Fire actual command
this.on(db_command.getRequestId().toString(), handleBothCommandsFunction);
// Fire the error command
this.on(errorCommand.getRequestId().toString(), handleBothCommandsFunction);
} else {
// Add the callback to the list of callbacks by the request id (mapping outgoing messages to correct callbacks)
this.on(listenToCommand.getRequestId().toString(), callback);
if(self.serverConfig.primary != null) {
this.notReplied[listenToCommand.getRequestId().toString()] = new Date().getTime();
}
}
// No more entries to process callback with the error
if(numberOfEntries <= 0) {
callback(err);
}
});
// Update the db_command request id
db_command.updateRequestId();
}
} else {
// Let's the out outgoing request Id
var requestId = db_command.getRequestId().toString();
// Fetch either a reader or writer dependent on the specified read option
var connection = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter();
// Override connection if needed
connection = specifiedConnection != null ? specifiedConnection : connection;
// Ensure we have a valid connection
if(connection == null) return null;
// Write the message out
connection.write(db_command, function(err) {
if(err != null) {
// Emit the error
self.emit("error", err);
}
});
}
}
try{
// Attempt forcing a reconnect if we have a replicaset server
if(self.serverConfig instanceof ReplSetServers && !self.serverConfig.isConnected()) {
// Initialize
self.isInitializing = true;
// Number of retries
var retries = self.serverConfig.retries;
// Attempts a reconnect
var reconnectAttempt = function() {
// Try reconnect
self.serverConfig.connect(self, function(err, result) {
// Initialize
self.isInitializing = true;
// Set retries
retries = retries - 1;
// If we fail retry the connec
if(err != null && retries > 0) {
// Wait an try again
setTimeout(reconnectAttempt, self.serverConfig.reconnectWait);
} else {
// Ensure we catch any errors happening and report them (especially important for replicaset servers)
try {
if(err != null && callback instanceof Function) return callback(err, null);
// for the other instances fire the message
var writer = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter();
// If we got safe set
if(errorCommand != null) {
writer.send([db_command, errorCommand], rawConnection);
var __retryCommandOnFailure = function(self, retryInMilliseconds, numberOfTimes, command, db_command, options, callback) {
// Number of retries done
var numberOfRetriesDone = numberOfTimes;
// The interval function triggers retries
var intervalId = setInterval(function() {
// Attemp a reconnect
self.serverConfig.connect(self, {firstCall: false}, function(err, result) {
// Adjust the number of retries done
numberOfRetriesDone = numberOfRetriesDone - 1;
// If we have no error, we are done
if(err != null && numberOfRetriesDone <= 0) {
// No more retries, clear interval retries and fire an error
clearInterval(intervalId);
callback(err, null);
} else if(err == null) {
// Clear retries and fire message
clearInterval(intervalId);
// If we have auths we need to replay them
if(Array.isArray(self.auths) && self.auths.length > 0) {
// Get number of auths we need to execute
var numberOfAuths = self.auths.length;
// Apply all auths
for(var i = 0; i < self.auths.length; i++) {
self.authenticate(self.auths[i].username, self.auths[i].password, function(err, authenticated) {
numberOfAuths = numberOfAuths - 1;
// If we have no more authentications to replay
if(numberOfAuths == 0) {
if(err != null || !authenticated) {
return callback(err, null);
} else {
writer.send(db_command, rawConnection);
}
} catch (err) {
// Set server config to disconnected if it's a replicaset
if(self.serverConfig instanceof ReplSetServers && err == "notConnected") {
// Just clear up all connections as we need to perform a complete reconnect for the call
self.serverConfig.disconnect();
command(self, db_command, options, callback);
}
// Signal an error
if(!(err instanceof Error)) err = new Error(err);
if(callback instanceof Function) {
if(errorCommand != null) delete self.notReplied[errorCommand.getRequestId().toString()];
return callback(err, null);
}
}
}
});
}
// Force a reconnect after self.serverConfig.reconnectWait seconds
setTimeout(reconnectAttempt, self.serverConfig.reconnectWait);
} else {
// for the other instances fire the message
var writer = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter();
// If we got safe set
if(errorCommand != null) {
writer.send([db_command, errorCommand], rawConnection);
})
}
} else {
writer.send(db_command, rawConnection);
}
}
} catch(err){
// Set server config to disconnected if it's a replicaset
if(self.serverConfig instanceof ReplSetServers && err == "notConnected") {
// Just clear up all connections as we need to perform a complete reconnect for the call
self.serverConfig.disconnect();
command(self, db_command, options, callback);
}
}
if(err == 'notConnected') {
delete self.notReplied[listenToCommand.getRequestId().toString()];
return callback(new Error(err), null);
}
});
}, retryInMilliseconds);
}
// Signal an error
if(!(err instanceof Error)) err = new Error(err);
if(callback instanceof Function) {
if(errorCommand != null) delete self.notReplied[errorCommand.getRequestId().toString()];
return callback(err, null);
}
// Return error object
return err;
}
/**
Execute db query command (not safe)
**/
Db.prototype._executeQueryCommand = function(db_command, options, callback) {
var self = this;
// Unpack the parameters
var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
options = args.length ? args.shift() : {};
// If the pool is not connected, attemp to reconnect to send the message
if(!this.serverConfig.isConnected() && this.serverConfig.autoReconnect) {
__retryCommandOnFailure(this, this.retryMiliSeconds, this.numberOfRetries, __executeQueryCommand, db_command, options, callback);
} else {
__executeQueryCommand(self, db_command, options, callback)
}
};
var __executeInsertCommand = function(self, db_command, options, callback) {
// Always checkout a writer for this kind of operations
var connection = self.serverConfig.checkoutWriter();
var safe = options['safe'] != null ? options['safe'] : false;
var raw = options['raw'] != null ? options['raw'] : self.raw;
var specifiedConnection = options['connection'] != null ? options['connection'] : null;
// Override connection if needed
connection = specifiedConnection != null ? specifiedConnection : connection;
// Ensure we have a valid connection
if(callback instanceof Function) {
// Ensure we have a valid connection
if(connection == null) return callback(new Error("no open connections"));
// We are expecting a check right after the actual operation
if(safe != null && safe != false) {
// db command is now an array of commands (original command + lastError)
db_command = [db_command, DbCommand.createGetLastErrorCommand(safe, self)];
// Register the handler in the data structure
self._registerHandler(db_command[1], raw, connection, callback);
}
}
// If we have no callback and there is no connection
if(connection == null) return null;
// Write the message out
connection.write(db_command, function(err) {
// Return the callback if it's not a safe operation and the callback is defined
if(callback instanceof Function && (safe == null || safe == false)) {
callback(err, null);
} else if(callback instanceof Function){
// Clean up listener and return error
var callbackInstance = self._removeHandler(db_command[1]);
// Only call if the reaper has not removed it
if(callbackInstance != null) {
callbackInstance.callback(err, null);
}
} else {
self.emit("error", err);
}
});
}
/**
Execute an insert Command
**/
Db.prototype._executeInsertCommand = function(db_command, options, callback) {
var self = this;
// Unpack the parameters
var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
options = args.length ? args.shift() : {};
// If the pool is not connected, attemp to reconnect to send the message
if(!this.serverConfig.isConnected() && this.serverConfig.autoReconnect) {
__retryCommandOnFailure(this, this.retryMiliSeconds, this.numberOfRetries, __executeInsertCommand, db_command, options, callback);
} else {
__executeInsertCommand(self, db_command, options, callback)
}
}
// Update command is the same
Db.prototype._executeUpdateCommand = Db.prototype._executeInsertCommand;
Db.prototype._executeRemoveCommand = Db.prototype._executeInsertCommand;
/**
* Wrap a Mongo error document into an Error instance
*/
Db.prototype.wrap = function(error) {
var e = new Error(error.err);
var e = new Error(error.err != null ? error.err : error.errmsg);
e.name = 'MongoError';

@@ -897,4 +1032,3 @@

var hostPort = h.split(':', 2);
return new Server(hostPort[0] || 'localhost', hostPort[1] || 27017,
serverOptions);
return new Server(hostPort[0] || 'localhost', hostPort[1] != null ? parseInt(hostPort[1]) : 27017, serverOptions);
});

@@ -928,1 +1062,9 @@

}
Db.prototype.removeAllEventListeners = function() {
this.removeAllListeners("close");
this.removeAllListeners("error");
this.removeAllListeners("parseError");
this.removeAllListeners("poolReady");
this.removeAllListeners("message");
}

@@ -930,4 +930,4 @@ /**

// Build query
var query = typeof fileIdObject == 'string' ? {'filename':fileIdObject} : {'_id':fileIdObject};
// Attempt to locate file
var query = (typeof fileIdObject == 'string' || Object.prototype.toString.call(fileIdObject) == '[object RegExp]' )
? {'filename':fileIdObject} : {'_id':fileIdObject}; // Attempt to locate file
collection.find(query, function(err, cursor) {

@@ -934,0 +934,0 @@ cursor.nextObject(function(err, item) {

@@ -21,5 +21,4 @@

, 'collection'
, 'connections/server'
, 'connections/repl_set_servers'
, 'connection'
, 'connection/server'
, 'connection/repl_set_servers'
, 'cursor'

@@ -54,5 +53,4 @@ , 'db'

, 'collection'
, 'connections/server'
, 'connections/repl_set_servers'
, 'connection'
, 'connection/server'
, 'connection/repl_set_servers'
, 'cursor'

@@ -89,5 +87,4 @@ , 'db'

, 'collection'
, 'connections/server'
, 'connections/repl_set_servers'
, 'connection'
, 'connection/server'
, 'connection/repl_set_servers'
, 'cursor'

@@ -94,0 +91,0 @@ , 'db'

@@ -8,41 +8,54 @@ var Long = require('../goog/math/long').Long,

**/
var MongoReply = exports.MongoReply = function(db, binary_reply) {
var MongoReply = exports.MongoReply = function() {
this.documents = [];
var index = 0;
this.index = 0;
};
MongoReply.prototype.parseHeader = function(binary_reply, bson) {
// Unpack the standard header first
var messageLength = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
index = index + 4;
this.messageLength = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
this.index = this.index + 4;
// Fetch the request id for this reply
this.requestId = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
index = index + 4;
this.requestId = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
this.index = this.index + 4;
// Fetch the id of the request that triggered the response
this.responseTo = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
this.responseTo = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
// Skip op-code field
index = index + 4 + 4;
this.index = this.index + 4 + 4;
// Unpack the reply message
this.responseFlag = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
index = index + 4;
this.responseFlag = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
this.index = this.index + 4;
// Unpack the cursor id (a 64 bit long integer)
var low_bits = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
index = index + 4;
var high_bits = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
index = index + 4;
this.cursorId = new db.bson_deserializer.Long(low_bits, high_bits);
var low_bits = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
this.index = this.index + 4;
var high_bits = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
this.index = this.index + 4;
this.cursorId = new bson.Long(low_bits, high_bits);
// Unpack the starting from
this.startingFrom = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
index = index + 4;
this.startingFrom = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
this.index = this.index + 4;
// Unpack the number of objects returned
this.numberReturned = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
index = index + 4;
this.numberReturned = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
this.index = this.index + 4;
}
MongoReply.prototype.parseBody = function(binary_reply, bson, raw) {
raw = raw == null ? false : raw;
// Let's unpack all the bson document, deserialize them and store them
for(var object_index = 0; object_index < this.numberReturned; object_index++) {
// Read the size of the bson object
var bsonObjectSize = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24;
// Deserialize the object and add to the documents array
this.documents.push(db.bson_deserializer.BSON.deserialize(binary_reply.slice(index, index + bsonObjectSize)));
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
// If we are storing the raw responses to pipe straight through
if(raw) {
// Deserialize the object and add to the documents array
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize));
} else {
// Deserialize the object and add to the documents array
this.documents.push(bson.BSON.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize)));
}
// Adjust binary index to point to next block of binary bson data
index = index + bsonObjectSize;
}
};
this.index = this.index + bsonObjectSize;
}
}

@@ -49,0 +62,0 @@ MongoReply.prototype.is_error = function(){

{ "name" : "mongodb"
, "description" : "A node.js driver for MongoDB"
, "keywords" : ["mongodb", "mongo", "driver", "db"]
, "version" : "0.9.6-23"
, "version" : "0.9.7"
, "author" : "Christian Amor Kvalheim <christkv@gmail.com>"

@@ -6,0 +6,0 @@ , "contributors" : [ "Aaron Heckmann",

@@ -14,3 +14,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

var MONGODB = 'integration_tests';
var client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 1}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)});

@@ -49,3 +49,3 @@ // Define the tests, we want them to run as a nested test so we only clean up the

shouldCorrectlyCallValidateCollection : function(test) {
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)});
fs_client.bson_deserializer = client.bson_deserializer;

@@ -60,15 +60,17 @@ fs_client.bson_serializer = client.bson_serializer;

fs_client.admin(function(err, adminDb) {
adminDb.authenticate('admin', 'admin', function(err, replies) {
adminDb.validateCollection('test', function(err, doc) {
// Pre 1.9.1 servers
if(doc.result != null) {
test.ok(doc.result != null);
test.ok(doc.result.match(/firstExtent/) != null);
} else {
test.ok(doc.firstExtent != null);
}
adminDb.addUser('admin', 'admin', function(err, result) {
adminDb.authenticate('admin', 'admin', function(err, replies) {
adminDb.validateCollection('test', function(err, doc) {
// Pre 1.9.1 servers
if(doc.result != null) {
test.ok(doc.result != null);
test.ok(doc.result.match(/firstExtent/) != null);
} else {
test.ok(doc.firstExtent != null);
}
fs_client.close();
test.done();
});
fs_client.close();
test.done();
});
});
});

@@ -83,3 +85,3 @@ });

shouldCorrectlySetDefaultProfilingLevel : function(test) {
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)});
fs_client.bson_deserializer = client.bson_deserializer;

@@ -94,8 +96,10 @@ fs_client.bson_serializer = client.bson_serializer;

fs_client.admin(function(err, adminDb) {
adminDb.authenticate('admin', 'admin', function(err, replies) {
adminDb.profilingLevel(function(err, level) {
test.equal("off", level);
adminDb.addUser('admin', 'admin', function(err, result) {
adminDb.authenticate('admin', 'admin', function(err, replies) {
adminDb.profilingLevel(function(err, level) {
test.equal("off", level);
fs_client.close();
test.done();
fs_client.close();
test.done();
});
});

@@ -111,3 +115,3 @@ });

shouldCorrectlyChangeProfilingLevel : function(test) {
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)});
fs_client.bson_deserializer = client.bson_deserializer;

@@ -157,3 +161,3 @@ fs_client.bson_serializer = client.bson_serializer;

shouldCorrectlySetAndExtractProfilingInfo : function(test) {
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)});
fs_client.bson_deserializer = client.bson_deserializer;

@@ -193,3 +197,3 @@ fs_client.bson_serializer = client.bson_serializer;

},
// run this last

@@ -206,140 +210,2 @@ noGlobalsLeaked : function(test) {

// Assign out tests
module.exports = tests;
// test_kill_cursors : function() {
// var test_kill_cursors_client = new Db('integration_tests4_', new Server("127.0.0.1", 27017, {auto_reconnect: true}), {});
// test_kill_cursors_client.bson_deserializer = client.bson_deserializer;
// test_kill_cursors_client.bson_serializer = client.bson_serializer;
// test_kill_cursors_client.pkFactory = client.pkFactory;
//
// test_kill_cursors_client.open(function(err, test_kill_cursors_client) {
// var number_of_tests_done = 0;
//
// test_kill_cursors_client.dropCollection('test_kill_cursors', function(err, collection) {
// test_kill_cursors_client.createCollection('test_kill_cursors', function(err, collection) {
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// var clientCursors = cursorInfo.clientCursors_size;
// var byLocation = cursorInfo.byLocation_size;
//
// for(var i = 0; i < 1000; i++) {
// collection.save({'i': i}, function(err, doc) {});
// }
//
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// test.equal(clientCursors, cursorInfo.clientCursors_size);
// test.equal(byLocation, cursorInfo.byLocation_size);
//
// for(var i = 0; i < 10; i++) {
// collection.findOne(function(err, item) {});
// }
//
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// test.equal(clientCursors, cursorInfo.clientCursors_size);
// test.equal(byLocation, cursorInfo.byLocation_size);
//
// for(var i = 0; i < 10; i++) {
// collection.find(function(err, cursor) {
// cursor.nextObject(function(err, item) {
// cursor.close(function(err, cursor) {});
//
// if(i == 10) {
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// test.equal(clientCursors, cursorInfo.clientCursors_size);
// test.equal(byLocation, cursorInfo.byLocation_size);
//
// collection.find(function(err, cursor) {
// cursor.nextObject(function(err, item) {
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// test.equal(clientCursors, cursorInfo.clientCursors_size);
// test.equal(byLocation, cursorInfo.byLocation_size);
//
// cursor.close(function(err, cursor) {
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// test.equal(clientCursors, cursorInfo.clientCursors_size);
// test.equal(byLocation, cursorInfo.byLocation_size);
//
// collection.find({}, {'limit':10}, function(err, cursor) {
// cursor.nextObject(function(err, item) {
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) {
// sys.puts("===================================== err: " + err)
// sys.puts("===================================== cursorInfo: " + sys.inspect(cursorInfo))
//
//
// test.equal(clientCursors, cursorInfo.clientCursors_size);
// test.equal(byLocation, cursorInfo.byLocation_size);
// number_of_tests_done = 1;
// });
// });
// });
// });
// });
// });
// });
// });
// });
// });
// }
// });
// });
// }
// });
// });
// });
// });
// });
//
// var intervalId = setInterval(function() {
// if(number_of_tests_done == 1) {
// clearInterval(intervalId);
// finished_test({test_kill_cursors:'ok'});
// test_kill_cursors_client.close();
// }
// }, 100);
// });
// },
// test_force_binary_error : function() {
// client.createCollection('test_force_binary_error', function(err, collection) {
// // Try to fetch an object using a totally invalid and wrong hex string... what we're interested in here
// // is the error handling of the findOne Method
// var result= "";
// var hexString = "5e9bd59248305adf18ebc15703a1";
// for(var index=0 ; index < hexString.length; index+=2) {
// var string= hexString.substr(index, 2);
// var number= parseInt(string, 16);
// result+= BinaryParser.fromByte(number);
// }
//
// // Generate a illegal ID
// var id = client.bson_serializer.ObjectID.createFromHexString('5e9bd59248305adf18ebc157');
// id.id = result;
//
// // Execute with error
// collection.findOne({"_id": id}, function(err, result) {
// // test.equal(undefined, result)
// test.ok(err != null)
// finished_test({test_force_binary_error:'ok'});
// });
// });
// },
// test_long_term_insert : function() {
// var numberOfTimes = 21000;
//
// client.createCollection('test_safe_insert', function(err, collection) {
// var timer = setInterval(function() {
// collection.insert({'test': 1}, {safe:true}, function(err, result) {
// numberOfTimes = numberOfTimes - 1;
//
// if(numberOfTimes <= 0) {
// clearInterval(timer);
// collection.count(function(err, count) {
// test.equal(21000, count);
// finished_test({test_long_term_insert:'ok'})
// });
// }
// });
// }, 1);
// });
// },
module.exports = tests;

@@ -83,12 +83,10 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

test.ok(err == null);
test.ok(replies);
// Kill a connection to force a reconnect
p_client.serverConfig.connection.pool[0].connection.end();
p_client.serverConfig.close();
p_client.createCollection('shouldCorrectlyReAuthorizeReconnectedConnections', function(err, collection) {
collection.insert({a:1}, {safe:true}, function(err, r) {
collection.insert({a:2}, {safe:true}, function(err, r) {
collection.insert({a:3}, {safe:true}, function(err, r) {
collection.insert({a:3}, {safe:true}, function(err, r) {
collection.count(function(err, count) {

@@ -141,3 +139,3 @@ test.equal(3, count);

},
// run this last

@@ -144,0 +142,0 @@ noGlobalsLeaked : function(test) {

var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();
var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
nodeunit = require('../../deps/nodeunit'),
gleak = require('../../tools/gleak'),
Db = mongodb.Db,

@@ -93,3 +94,2 @@ Cursor = mongodb.Cursor,

test.ok(result2 != null);
admin.logout(this.parallel());

@@ -146,10 +146,11 @@ db1.admin().logout(this.parallel());

db1.collection('stuff', function(err, collection) {
collection.find().toArray(function(err, items) {
test.ok(err == null);
test.equal(1, items.length);
db1.collection('stuff', function(err, collection) {
collection.insert({a:2}, {safe:true}, self.parallel());
});
db2.collection('stuff', function(err, collection) {

@@ -178,4 +179,3 @@ collection.insert({a:2}, {safe:true}, self.parallel());

function logoutDb2(err, result) {
test.ok(err != null);
test.ok(err != null);
db2.logout(this);

@@ -190,2 +190,6 @@ },

test.done();
// Close all connections
db1.close();
db2.close();
admin.close();
});

@@ -196,2 +200,8 @@ });

},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})

@@ -198,0 +208,0 @@

var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();
var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
nodeunit = require('../../deps/nodeunit'),
gleak = require('../../tools/gleak'),
Db = mongodb.Db,

@@ -18,2 +19,3 @@ Cursor = mongodb.Cursor,

var serverManager = null;
var RS = RS == null ? null : RS;

@@ -38,11 +40,10 @@ // Define the tests, we want them to run as a nested test so we only clean up the

RS.killAll(function() {
callback();
})
callback();
});
},
shouldCorrectlyAuthenticate : function(test) {
shouldCorrectlyAuthenticateWithMultipleLoginsAndLogouts : function(test) {
var replSet = new ReplSetServers( [
new Server( RS.host, RS.ports[1], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[2], { auto_reconnect: true } )
],

@@ -83,3 +84,3 @@ {rs_name:RS.name}

db.collection("stuff", function(err, collection) {
collection.insert({a:2}, {safe: {w: 3}}, self);
collection.insert({a:3}, {safe: true}, self);
});

@@ -99,3 +100,3 @@ },

test.ok(err == null);
test.equal(2, item.a);
test.equal(3, item.a);

@@ -125,7 +126,6 @@ db.admin().logout(this);

});
},
},
function shouldCorrectlyAuthenticateAgainstSecondary(err, result) {
test.ok(err != null)
test.ok(err != null)
slaveDb.admin().authenticate('me', 'secret', this);

@@ -146,5 +146,7 @@ },

test.ok(err == null);
test.equal(2, item.a);
test.equal(3, item.a);
test.done();
p_db.close();
slaveDb.close();
}

@@ -159,3 +161,2 @@ )

new Server( RS.host, RS.ports[0], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[2], { auto_reconnect: true } )
],

@@ -178,5 +179,5 @@ {rs_name:RS.name, read_secondary:true}

test.ok(result != null);
db.collection("stuff", function(err, collection) {
collection.insert({a:2}, {safe: {w: 3}}, self);
collection.insert({a:2}, {safe: {w: 2, wtimeout: 10000}}, self);
});

@@ -195,5 +196,5 @@ },

test.ok(result);
db.collection("stuff", function(err, collection) {
collection.insert({a:2}, {safe: {w: 3}}, self);
collection.insert({a:2}, {safe: {w: 2, wtimeout: 10000}}, self);
});

@@ -215,6 +216,13 @@ },

test.done();
p_db.close();
}
)
});
}
},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})

@@ -221,0 +229,0 @@

@@ -224,5 +224,5 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

error_client.bson_serializer = client.bson_serializer;
error_client.pkFactory = client.pkFactory;
error_client.pkFactory = client.pkFactory;
test.equal(true, error_client.strict);
error_client.open(function(err, error_client) {

@@ -233,3 +233,3 @@ error_client.collection('does-not-exist', function(err, collection) {

});
error_client.createCollection('test_strict_access_collection', function(err, collection) {

@@ -288,21 +288,21 @@ error_client.collection('test_strict_access_collection', function(err, collection) {

test.equal("key $hello must not start with '$'", err.message);
collection.insert({'he$llo':'world'}, {safe:true}, function(err, docs) {
test.ok(docs[0].constructor == Object);
collection.insert({'hello':{'hell$o':'world'}}, {safe:true}, function(err, docs) {
test.ok(err == null);
collection.insert({'.hello':'world'}, {safe:true}, function(err, doc) {
test.ok(err instanceof Error);
test.equal("key .hello must not contain '.'", err.message);
collection.insert({'hello':{'.hello':'world'}}, {safe:true}, function(err, doc) {
test.ok(err instanceof Error);
test.equal("key .hello must not contain '.'", err.message);
collection.insert({'hello.':'world'}, {safe:true}, function(err, doc) {
test.ok(err instanceof Error);
test.equal("key hello. must not contain '.'", err.message);
collection.insert({'hello':{'hello.':'world'}}, {safe:true}, function(err, doc) {

@@ -469,3 +469,3 @@ test.ok(err instanceof Error);

test.equal(1, doc.a);
id = new client.bson_serializer.ObjectID(null)

@@ -485,3 +485,3 @@ doc = {_id:id, a:2};

test.equal(2, doc2.a);
collection.update({"_id":id}, doc2, {safe:true, upsert:true}, function(err, result) {

@@ -635,3 +635,3 @@ test.equal(null, err);

},
// run this last

@@ -638,0 +638,0 @@ noGlobalsLeaked : function(test) {

@@ -70,19 +70,22 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

testCloseNoCallback : function(test) {
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}),
{native_parser: (process.env['TEST_NATIVE'] != null)});
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)});
db.open(connectionTester(test, 'testCloseNoCallback', function() {
var dbCloseCount = 0, connectionCloseCount = 0, poolCloseCount = 0;
db.on('close', function() { ++dbCloseCount; });
var connection = db.serverConfig.connection;
connection.on('close', function() { ++connectionCloseCount; });
connection.pool.forEach(function(poolMember) {
poolMember.connection.on('close', function() { ++poolCloseCount; });
});
db.close();
setTimeout(function() {
test.equal(dbCloseCount, 1);
test.equal(connectionCloseCount, 1);
test.equal(poolCloseCount, 4);
test.done();
}, 250);
// Ensure no close events are fired as we are closing the connection specifically
db.on('close', function() { dbCloseCount++; });
var connectionPool = db.serverConfig.connectionPool;
var connections = connectionPool.getAllConnections();
var keys = Object.keys(connections);
// Ensure no close events are fired as we are closing the connection specifically
for(var i = 0; i < keys.length; i++) {
connections[keys[i]].on("close", function() { test.ok(false); });
}
// Force the connection close
db.serverConfig.connectionPool.stop();
// Test done
test.equal(1, dbCloseCount);
test.done();
}));

@@ -92,21 +95,21 @@ },

testCloseWithCallback : function(test) {
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}),
{native_parser: (process.env['TEST_NATIVE'] != null)});
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}),{native_parser: (process.env['TEST_NATIVE'] != null)});
db.open(connectionTester(test, 'testCloseWithCallback', function() {
var dbCloseCount = 0, connectionCloseCount = 0, poolCloseCount = 0;
db.on('close', function() { ++dbCloseCount; });
var connection = db.serverConfig.connection;
connection.on('close', function() { ++connectionCloseCount; });
connection.pool.forEach(function(poolMember) {
poolMember.connection.on('close', function() { ++poolCloseCount; });
});
// Ensure no close events are fired as we are closing the connection specifically
db.on('close', function() { dbCloseCount++; });
var connectionPool = db.serverConfig.connectionPool;
var connections = connectionPool.getAllConnections();
var keys = Object.keys(connections);
// Ensure no close events are fired as we are closing the connection specifically
for(var i = 0; i < keys.length; i++) {
connections[keys[i]].on("close", function() { test.ok(false); });
}
db.close(function() {
// Let all events fire.
process.nextTick(function() {
test.equal(dbCloseCount, 1);
test.equal(connectionCloseCount, 1);
test.equal(poolCloseCount, 4);
test.done();
});
// Test done
test.equal(0, dbCloseCount);
test.done();
});

@@ -113,0 +116,0 @@ }));

@@ -21,3 +21,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();

'Should Correctly create a pool instance with the expected values' : function(test) {
var connectionPool = new ConnectionPool('localhost', 2000, 1, {timeout:100, noDelay:true});
var connectionPool = new ConnectionPool('localhost', 2000, 1, null, {timeout:100, noDelay:true});
test.equal(100, connectionPool.socketOptions.timeout);

@@ -31,12 +31,12 @@ test.equal(true, connectionPool.socketOptions.noDelay);

'Should correctly fail due to no server' : function(test) {
var connectionPool = new ConnectionPool('localhost', 2000, 1, {timeout:100, noDelay:true});
// Add event handler that will fire once the pool is ready
connectionPool.on("poolReady", function(err, result) {
var connectionPool = new ConnectionPool('localhost', 2000, 4, null, {timeout:100, noDelay:true});
// // Add event handler that will fire once the pool is ready
connectionPool.on("poolReady", function(err, result) {
})
// Add event handler that will fire when it fails
connectionPool.on("error", function(err, connection) {
test.equal(0, Object.keys(connectionPool.waitingToOpen).length);
test.equal(1, Object.keys(connectionPool.connectionsWithErrors).length);
test.equal(4, Object.keys(connectionPool.connectionsWithErrors).length);
test.equal(0, Object.keys(connectionPool.openConnections).length);

@@ -49,6 +49,6 @@ test.done();

},
'Should Correctly create a pool of connections and receive an ok when all connections are active' : function(test) {
var connectionPool = new ConnectionPool('localhost', 27017, 4, {timeout:100, noDelay:true});
// Add event handler that will fire once the pool is ready

@@ -63,2 +63,22 @@ connectionPool.on("poolReady", function() {

'Should Correctly connect and then force a restart creating new connections' : function(test) {
var connectionPool = new ConnectionPool('localhost', 27017, 4, {timeout:100, noDelay:true});
var done = false;
// Add event handler that will fire once the pool is ready
connectionPool.on("poolReady", function() {
// Restart
if(done) {
test.done();
} else {
// Trigger stop
connectionPool.restart();
done = true;
}
})
// Start the pool
connectionPool.start();
},
noGlobalsLeaked : function(test) {

@@ -65,0 +85,0 @@ var leaks = gleak.detectNew();

@@ -979,3 +979,35 @@ var testCase = require('../deps/nodeunit').testCase,

},
'Should correctly rewind and restart cursor' : function(test) {
var docs = [];
for(var i = 0; i < 100; i++) {
var d = new Date().getTime() + i*1000;
docs[i] = {'a':i, createdAt:new Date(d)};
}
// Create collection
client.createCollection('Should_correctly_rewind_and_restart_cursor', function(err, collection) {
test.equal(null, err);
// insert all docs
collection.insert(docs, {safe:true}, function(err, result) {
test.equal(null, err);
var cursor = collection.find({});
cursor.nextObject(function(err, item) {
test.equal(0, item.a)
// Rewind the cursor
cursor.rewind();
// Grab the first object
cursor.nextObject(function(err, item) {
test.equal(0, item.a)
test.done();
})
})
})
});
},
// run this last

@@ -982,0 +1014,0 @@ noGlobalsLeaked: function(test) {

@@ -90,3 +90,2 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

}
})

@@ -93,0 +92,0 @@

@@ -98,13 +98,10 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

shouldCorrectlyPerformAutomaticConnect : function(test) {
var automatic_connect_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var automatic_connect_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true}), {native_parser: (process.env['TEST_NATIVE'] != null), retryMiliSeconds:50});
automatic_connect_client.bson_deserializer = client.bson_deserializer;
automatic_connect_client.bson_serializer = client.bson_serializer;
automatic_connect_client.pkFactory = client.pkFactory;
automatic_connect_client.open(function(err, automatic_connect_client) {
// Listener for closing event
var closeListener = function(has_error) {
// Remove the listener for the close to avoid loop
automatic_connect_client.removeListener("close", closeListener);
// Let's insert a document

@@ -133,19 +130,2 @@ automatic_connect_client.collection('test_object_id_generation.data2', function(err, collection) {

// Test that error conditions are handled correctly
shouldCorrectlyHandleConnectionErrors : function(test) {
// Test error handling for single server connection
var serverConfig = new Server("127.0.0.1", 21017, {auto_reconnect: true});
var error_client = new Db(MONGODB, serverConfig, {native_parser: (process.env['TEST_NATIVE'] != null) ? true : false});
error_client.on("error", function(err) {});
error_client.on("close", function(connection) {
test.ok(typeof connection == typeof serverConfig);
test.equal("127.0.0.1", connection.host);
test.equal(21017, connection.port);
test.equal(true, connection.autoReconnect);
test.done();
});
error_client.open(function(err, error_client) {});
},
shouldCorrectlyExecuteEvalFunctions : function(test) {

@@ -152,0 +132,0 @@ client.eval('function (x) {return x;}', [3], function(err, result) {

@@ -88,3 +88,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

error_client.error(function(err, documents) {
error_client.error(function(err, documents) {
test.equal(true, documents[0].ok);

@@ -98,3 +98,3 @@ test.equal(0, documents[0].n);

test.ok(err instanceof Error);
test.ok('notConnected' === err.message);
test.equal('no open connections', err.message);
test.done();

@@ -240,15 +240,15 @@ });

test.ok(err instanceof Error);
test.ok('notConnected' === err.message);
test.equal('no open connections', err.message);
collection.update({ inserted: true }, { inserted: true, x: 1 }, { safe: true }, function (err) {
test.ok(err instanceof Error);
test.ok('notConnected' === err.message);
test.equal('no open connections', err.message);
collection.remove({ inserted: true }, { safe: true }, function (err) {
test.ok(err instanceof Error);
test.ok('notConnected' === err.message);
test.equal('no open connections', err.message);
collection.findOne({ works: true }, function (err) {
test.ok(err instanceof Error);
test.ok('notConnected' === err.message);
test.equal('no open connections', err.message);
test.done();

@@ -258,4 +258,3 @@ });

});
});
});
});

@@ -273,5 +272,5 @@ });

test.ok(err == null);
var query = {a:{$within:{$box:[[1,-10],[80,120]]}}};
// We don't have a geospatial index at this point

@@ -288,3 +287,3 @@ collection.findOne(query, function(err, docs) {

var invalidQuery = {a:{$within:{$box:[[-10,-180],[10,180]]}}};
client.admin().serverInfo(function(err, result){

@@ -298,5 +297,5 @@ collection.findOne(invalidQuery, function(err, doc) {

}
test.done();
});
});
});

@@ -310,3 +309,3 @@ });

},
noGlobalsLeaked : function(test) {

@@ -313,0 +312,0 @@ var leaks = gleak.detectNew();

@@ -50,4 +50,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

client.collection('shouldCorrectlyHandleThrownError', function(err, collection) {
test.done();
//debug(someUndefinedVariable);
debug(someUndefinedVariable);
});

@@ -71,4 +70,3 @@ } catch (err) {

collection.rename("shouldCorrectlyHandleThrownErrorInRename2", function(err, result) {
test.done();
//debug(someUndefinedVariable);
debug(someUndefinedVariable);
})

@@ -75,0 +73,0 @@ });

@@ -47,52 +47,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

},
'Error thrown in handler test': function(test){
// Should not be called
var exceptionHandler = function(exception) {
numberOfFailsCounter++;
//console.log('Exception caught: ' + exception.message);
//console.log(inspect(exception.stack.toString()))
};
// Number of times we should fail
var numberOfFailsCounter = 0;
process.on('uncaughtException', exceptionHandler)
client.createCollection('error_test', function(err, collection) {
var testObject = {};
for(var i = 0; i < 5000; i++){
testObject['setting_' + i] = i;
}
testObject.name = 'test1';
var c = 0;
var counter = 0;
collection.insert([testObject, {name:'test2'}], {safe:true}, function(err, doc) {
var findOne = function(){
collection.findOne({name: 'test1'}, function(err, doc) {
counter++;
if(counter > POOL_SIZE){
process.removeListener('uncaughtException', exceptionHandler);
collection.findOne({name: 'test1'}, function(err, doc) {
test.equal(5002, Object.keys(doc).length)
test.equal(4, numberOfFailsCounter);
test.done();
});
} else {
process.nextTick(findOne);
throw new Error('Some error');
}
});
};
findOne();
});
});
},
// Test a simple find

@@ -150,6 +101,6 @@ shouldCorrectlyPerformSimpleFind : function(test) {

test.equal(2, documents.length);
collection.count(function(err, count) {
test.equal(2, count);
// Fetch values by selection

@@ -911,18 +862,87 @@ collection.find({'a': doc1.a}).toArray(function(err, documents) {

// Should correctly execute findAndModify that is breaking in prod
// shouldCorrectlyExecuteFindAndModify : function(test) {
// client.createCollection('shouldCorrectlyExecuteFindAndModify', function(err, collection) {
// var self = {_id : new client.bson_serializer.ObjectID()}
// var _uuid = 'sddffdss'
//
// collection.findAndModify(
// {_id: self._id, 'plays.uuid': _uuid},
// [],
// {$set : {'plays.$.active': true}},
// {new: true, fields: {plays: 0, results: 0}, safe: true},
// function(err, contest) {
// test.done();
// })
// });
// },
shouldCorrectlyExecuteFindAndModify : function(test) {
client.createCollection('shouldCorrectlyExecuteFindAndModify', function(err, collection) {
var self = {_id : new client.bson_serializer.ObjectID()}
var _uuid = 'sddffdss'
collection.findAndModify(
{_id: self._id, 'plays.uuid': _uuid},
[],
{$set : {'plays.$.active': true}},
{new: true, fields: {plays: 0, results: 0}, safe: true},
function(err, contest) {
test.done();
})
});
},
'Should correctly return record with 64-bit id' : function(test) {
client.createCollection('should_correctly_return_record_with_64bit_id', function(err, collection) {
var _lowerId = new client.bson_serializer.ObjectID();
var _higherId = new client.bson_serializer.ObjectID();
var lowerId = new client.bson_serializer.Long.fromString('133118461172916224', 10);
var higherId = new client.bson_serializer.Long.fromString('133118461172916225', 10);
var lowerDoc = {_id:_lowerId, id: lowerId};
var higherDoc = {_id:_higherId, id: higherId};
collection.insert([lowerDoc, higherDoc], {safe:true}, function(err, result) {
test.ok(err == null);
// Select record with id of 133118461172916225 using $gt directive
collection.find({id: {$gt: lowerId}}, {}, function(err, cur) {
test.ok(err == null);
cur.toArray(function(err, arr) {
test.ok(err == null);
test.equal(arr.length, 1, 'Selecting record via $gt directive on 64-bit integer should return a record with higher Id')
test.equal(arr[0].id.toString(), '133118461172916225', 'Returned Id should be equal to 133118461172916225')
test.done()
});
});
});
});
},
'Should Correctly find a Document using findOne excluding _id field' : function(test) {
client.createCollection('Should_Correctly_find_a_Document_using_findOne_excluding__id_field', function(err, collection) {
var doc = {_id : new client.bson_serializer.ObjectID(), a:1, c:2}
// insert doc
collection.insert(doc, {safe:true}, function(err, result) {
// Get one document, excluding the _id field
collection.findOne({a:1}, {fields:{'_id': 0}}, function(err, item) {
test.equal(null, item._id);
test.equal(1, item.a);
test.equal(2, item.c);
collection.find({a:1}, {fields:{'_id':0}}).toArray(function(err, items) {
var item = items[0]
test.equal(null, item._id);
test.equal(1, item.a);
test.equal(2, item.c);
test.done();
})
})
});
});
},
'Should correctly execute find and findOne queries in the same way' : function(test) {
client.createCollection('Should_correctly_execute_find_and_findOne_queries_in_the_same_way', function(err, collection) {
var doc = {_id : new client.bson_serializer.ObjectID(), a:1, c:2, comments:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]};
// insert doc
collection.insert(doc, {safe:true}, function(err, result) {
collection.find({_id: doc._id}, {comments: {$slice: -5}}).toArray(function(err, docs) {
test.equal(5, docs[0].comments.length)
collection.findOne({_id: doc._id}, {comments: {$slice: -5}}, function(err, item) {
test.equal(5, item.comments.length)
test.done();
});
});
});
});
},
noGlobalsLeaked : function(test) {

@@ -929,0 +949,0 @@ var leaks = gleak.detectNew();

@@ -230,3 +230,2 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();

for(var i = 0; i < data.length; i++) {
// debug(" i = " + i)
test.equal(data2[i], data[i])

@@ -233,0 +232,0 @@ test.equal(streamData[i], data[i])

@@ -657,2 +657,21 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();

shouldCheckExistsByUsingRegexp : function(test) {
var gridStore = new GridStore(client, 'shouldCheckExistsByUsingRegexp.txt', 'w');
gridStore.open(function(err, gridStore) {
gridStore.write('bar', function(err, gridStore) {
gridStore.close(function(err, result) {
GridStore.exist(client, /shouldCheck/, function(err, result) {
test.equal(null, err);
test.equal(true, result);
client.close();
test.done();
});
});
});
});
},
noGlobalsLeaked : function(test) {

@@ -659,0 +678,0 @@ var leaks = gleak.detectNew();

@@ -546,3 +546,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

// Insert the update
collection.insert({i:1, z:func }, {safe:true}, function(err, result) {
collection.insert({i:1, z:func }, {safe:true, serializeFunctions:true}, function(err, result) {
collection.findOne({_id:result[0]._id}, function(err, object) {

@@ -740,2 +740,81 @@ test.equal(func.toString(), object.z.code);

'Should Correctly allow for control of serialization of functions on command level' : function(test) {
var doc = {
str : "String",
func : function() {}
}
client.createCollection("Should_Correctly_allow_for_control_of_serialization_of_functions_on_command_level", function(err, collection) {
test.ok(err == null);
collection.insert(doc, {safe:true}, function(err, result) {
collection.update({str:"String"}, {$set:{c:1, d:function(){}}}, {safe:true, serializeFunctions:false}, function(err, result) {
test.equal(1, result);
collection.findOne({str:"String"}, function(err, item) {
test.equal(null, item.d);
// Execute a safe insert with replication to two servers
collection.findAndModify({str:"String"}, [['a', 1]], {'$set':{'f':function() {}}}, {new:true, safe: true, serializeFunctions:true}, function(err, result) {
test.ok(result.f instanceof client.bson_deserializer.Code)
test.done();
})
})
})
});
});
},
'Should Correctly allow for control of serialization of functions on collection level' : function(test) {
var doc = {
str : "String",
func : function() {}
}
client.createCollection("Should_Correctly_allow_for_control_of_serialization_of_functions_on_collection_level", {serializeFunctions:true}, function(err, collection) {
test.ok(err == null);
collection.insert(doc, {safe:true}, function(err, result) {
test.equal(null, err);
collection.findOne({str : "String"}, function(err, item) {
test.ok(item.func instanceof client.bson_deserializer.Code);
test.done();
});
});
});
},
'Should Correctly allow for using a Date object as _id' : function(test) {
var doc = {
_id : new Date(),
str : 'hello'
}
client.createCollection("Should_Correctly_allow_for_using_a_Date_object_as__id", {serializeFunctions:true}, function(err, collection) {
test.ok(err == null);
collection.insert(doc, {safe:true}, function(err, result) {
test.equal(null, err);
collection.findOne({str : "hello"}, function(err, item) {
test.ok(item._id instanceof Date);
test.done();
});
});
});
},
'Should Correctly fail to update returning 0 results' : function(test) {
client.createCollection("Should_Correctly_fail_to_update_returning_0_results", {serializeFunctions:true}, function(err, collection) {
test.ok(err == null);
collection.update({a:1}, {$set: {a:1}}, {safe:true}, function(err, numberOfUpdated) {
test.equal(0, numberOfUpdated);
test.done();
});
});
},
noGlobalsLeaked : function(test) {

@@ -742,0 +821,0 @@ var leaks = gleak.detectNew();

@@ -46,18 +46,18 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

// shouldCorrectlyInsertSimpleRegExpDocument : function(test) {
// var regexp = /foobar/i;
//
// client.createCollection('test_regex', function(err, collection) {
// collection.insert({'b':regexp}, {safe:true}, function(err, ids) {
// collection.find({}, {'fields': ['b']}, function(err, cursor) {
// cursor.toArray(function(err, items) {
// test.equal(("" + regexp), ("" + items[0].b));
// // Let's close the db
// test.done();
// });
// });
// });
// });
// },
shouldCorrectlyInsertSimpleRegExpDocument : function(test) {
var regexp = /foobar/i;
client.createCollection('test_regex', function(err, collection) {
collection.insert({'b':regexp}, {safe:true}, function(err, ids) {
collection.find({}, {'fields': ['b']}, function(err, cursor) {
cursor.toArray(function(err, items) {
test.equal(("" + regexp), ("" + items[0].b));
// Let's close the db
test.done();
});
});
});
});
},
shouldCorrectlyInsertSimpleUTF8Regexp : function(test) {

@@ -64,0 +64,0 @@ var regexp = /foobaré/;

@@ -0,4 +1,7 @@

var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false;
var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
gleak = require('../../tools/gleak'),
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager,

@@ -12,2 +15,3 @@ Db = require('../../lib/mongodb').Db,

var retries = 120;
var RS = RS == null ? null : RS;

@@ -27,5 +31,15 @@ var ensureConnection = function(test, numberOfTries, callback) {

var db = new Db('integration_test_', replSet);
db.open(function(err, p_db) {
// Print any errors
db.on("error", function(err) {
console.log("============================= ensureConnection caught error")
console.dir(err)
if(err != null && err.stack != null) console.log(err.stack)
db.close();
})
// Open the db
db.open(function(err, p_db) {
db.close();
if(err != null) {
db.close();
// Wait for a sec and retry

@@ -35,5 +49,5 @@ setTimeout(function() {

ensureConnection(test, numberOfTries, callback);
}, 1000);
}, 3000);
} else {
return callback(null, p_db);
return callback(null);
}

@@ -46,6 +60,6 @@ })

// Create instance of replicaset manager but only for the first call
if(!serversUp) {
if(!serversUp && !noReplicasetStart) {
serversUp = true;
RS = new ReplicaSetManager({retries:120});
RS.startSet(true, function(err, result) {
RS = new ReplicaSetManager({retries:120, passive_count:0});
RS.startSet(true, function(err, result) {
callback();

@@ -62,4 +76,4 @@ });

RS.restartKilledNodes(function(err, result) {
callback();
})
callback();
});
},

@@ -81,2 +95,3 @@

test.done();
p_db.close();
})

@@ -98,11 +113,8 @@ },

db.on('close', function() { ++dbCloseCount; });
db.serverConfig.servers.forEach(function(server) {
server.connection.on('close', function() { ++serverCloseCount; });
});
db.close();
setTimeout(function() {
test.equal(dbCloseCount, 1);
test.equal(serverCloseCount, db.serverConfig.servers.length);
test.done();
}, 250);
}, 2000);
})

@@ -122,13 +134,10 @@ },

test.equal(null, err);
var dbCloseCount = 0, serverCloseCount = 0;
var dbCloseCount = 0;//, serverCloseCount = 0;
db.on('close', function() { ++dbCloseCount; });
var connection = db.serverConfig.connection;
db.serverConfig.servers.forEach(function(server) {
server.connection.on('close', function() { ++serverCloseCount; });
});
db.close(function() {
// Let all events fire.
process.nextTick(function() {
test.equal(dbCloseCount, 1);
test.equal(serverCloseCount, db.serverConfig.servers.length);
test.equal(dbCloseCount, 0);
// test.equal(serverCloseCount, db.serverConfig.servers.length);
test.done();

@@ -158,3 +167,10 @@ });

shouldConnectWithPrimarySteppedDown : function(test) {
// debug("=========================================== shouldConnectWithPrimarySteppedDown")
var replSet = new ReplSetServers( [
new Server( RS.host, RS.ports[1], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[2], { auto_reconnect: true } )
],
{rs_name:RS.name}
);
// Step down primary server

@@ -164,8 +180,10 @@ RS.stepDownPrimary(function(err, result) {

ensureConnection(test, retries, function(err, p_db) {
if(err != null) debug("shouldConnectWithPrimarySteppedDown :: " + inspect(err));
test.ok(err == null);
test.equal(true, p_db.serverConfig.isConnected());
p_db.close();
test.done();
new Db('integration_test_', replSet).open(function(err, p_db) {
test.ok(err == null);
test.equal(true, p_db.serverConfig.isConnected());
p_db.close();
test.done();
})
});

@@ -176,3 +194,2 @@ });

shouldConnectWithThirdNodeKilled : function(test) {
// debug("=========================================== shouldConnectWithThirdNodeKilled")
RS.getNodeFromPort(RS.ports[2], function(err, node) {

@@ -194,8 +211,9 @@ if(err != null) debug("shouldConnectWithThirdNodeKilled :: " + inspect(err));

ensureConnection(test, retries, function(err, p_db) {
if(err != null) debug("shouldConnectWithThirdNodeKilled :: " + inspect(err));
test.ok(err == null);
test.equal(true, p_db.serverConfig.isConnected());
new Db('integration_test_', replSet).open(function(err, p_db) {
test.ok(err == null);
test.equal(true, p_db.serverConfig.isConnected());
p_db.close();
test.done();
p_db.close();
test.done();
})
});

@@ -207,4 +225,4 @@ });

shouldConnectWithSecondaryNodeKilled : function(test) {
// debug("=========================================== shouldConnectWithSecondaryNodeKilled")
RS.killSecondary(function(node) {
// Replica configuration

@@ -220,4 +238,11 @@ var replSet = new ReplSetServers( [

var db = new Db('integration_test_', replSet);
// Print any errors
db.on("error", function(err) {
console.log("============================= caught error")
console.dir(err)
if(err.stack != null) console.log(err.stack)
test.done();
})
db.open(function(err, p_db) {
if(err != null) debug("shouldConnectWithSecondaryNodeKilled :: " + inspect(err));
test.ok(err == null);

@@ -227,3 +252,3 @@ test.equal(true, p_db.serverConfig.isConnected());

// Close and cleanup
db.close();
p_db.close();
test.done();

@@ -235,3 +260,2 @@ })

shouldConnectWithPrimaryNodeKilled : function(test) {
// debug("=========================================== shouldConnectWithPrimaryNodeKilled")
RS.killPrimary(function(node) {

@@ -246,13 +270,8 @@ // Replica configuration

);
var db = new Db('integration_test_', replSet);
ensureConnection(test, retries, function(err, p_db) {
if(err != null) debug("shouldConnectWithPrimaryNodeKilled :: " + inspect(err));
test.ok(err == null);
test.equal(true, p_db.serverConfig.isConnected());
p_db.close();
if(err != null && err.stack != null) console.log(err.stack)
test.done();
});
// })
});
});

@@ -262,3 +281,2 @@ },

shouldCorrectlyBeAbleToUsePortAccessors : function(test) {
// debug("=========================================== shouldCorrectlyBeAbleToUsePortAccessors")
// Replica configuration

@@ -279,3 +297,3 @@ var replSet = new ReplSetServers( [

db.close();
p_db.close();
test.done();

@@ -286,3 +304,2 @@ })

shouldCorrectlyConnect: function(test) {
// debug("=========================================== shouldCorrectlyConnect")
// Replica configuration

@@ -305,12 +322,12 @@ var replSet = new ReplSetServers( [

if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err));
test.notEqual(null, primary);
test.equal(primary, p_db.serverConfig.primary.host + ":" + p_db.serverConfig.primary.port);
test.equal(primary, p_db.serverConfig.primary.host + ":" + p_db.serverConfig.primary.port);
// Perform tests
RS.secondaries(function(err, items) {
if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err));
// Test if we have the right secondaries
test.deepEqual(items.sort(), p_db.serverConfig.secondaries.map(function(item) {
test.deepEqual(items.sort(), p_db.serverConfig.allSecondaries.map(function(item) {
return item.host + ":" + item.port;

@@ -322,7 +339,6 @@ }).sort());

if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err));
test.deepEqual(items.sort(), p_db.serverConfig.arbiters.map(function(item) {
return item.host + ":" + item.port;
}).sort());
// Force new instance

@@ -332,5 +348,4 @@ var db2 = new Db('integration_test_', replSet );

if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err));
test.equal(true, p_db2.serverConfig.isConnected());
// Close top instance

@@ -346,2 +361,8 @@ db.close();

},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})
var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false;
var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
gleak = require('../../tools/gleak'),
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager,

@@ -14,2 +15,3 @@ Db = require('../../lib/mongodb').Db,

var retries = 120;
var RS = RS == null ? null : RS;

@@ -29,2 +31,11 @@ var ensureConnection = function(test, numberOfTries, callback) {

var db = new Db('integration_test_', replSet);
// Print any errors
db.on("error", function(err) {
console.log("============================= ensureConnection caught error")
console.dir(err)
if(err != null && err.stack != null) console.log(err.stack)
db.close();
})
// Open the db
db.open(function(err, p_db) {

@@ -65,5 +76,4 @@ if(err != null) {

RS.restartKilledNodes(function(err, result) {
if(err != null) throw err;
callback();
})
callback();
});
},

@@ -85,7 +95,5 @@

db.open(function(err, p_db) {
// debug("====================================================== 1")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));
// if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));
// Drop collection on replicaset
p_db.dropCollection('testsets', function(err, r) {
// debug("====================================================== 2")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

@@ -95,3 +103,2 @@

p_db.createCollection('testsets', function(err, collection) {
// debug("====================================================== 3")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

@@ -101,3 +108,2 @@

collection.insert({a:20}, {safe: {w:2, wtimeout: 10000}}, function(err, r) {
// debug("====================================================== 4")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

@@ -107,3 +113,2 @@

collection.count(function(err, c) {
// debug("====================================================== 5")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

@@ -119,3 +124,2 @@

RS.killPrimary(function(node) {
// debug("====================================================== 6")

@@ -125,3 +129,2 @@ // Ensure valid connection

ensureConnection(test, retries, function(err, p_db) {
// debug("====================================================== 7")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

@@ -133,11 +136,8 @@

p_db.collection('testsets', function(err, collection) {
// debug("====================================================== 8")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));
collection.insert({a:30}, {safe:true}, function(err, r) {
// debug("====================================================== 9")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));
collection.insert({a:40}, {safe:true}, function(err, r) {
// debug("====================================================== 10")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

@@ -147,3 +147,2 @@

collection.count(function(err, c) {
// debug("====================================================== 11")
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

@@ -167,3 +166,9 @@

})
}
},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})

@@ -170,0 +175,0 @@

var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false;
var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
gleak = require('../../tools/gleak'),
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager,

@@ -15,6 +16,5 @@ Db = require('../../lib/mongodb').Db,

var retries = 120;
// var RS = null;
var RS = RS == null ? null : RS;
var ensureConnection = function(test, numberOfTries, callback) {
// debug("=========================================== ensureConnection::" + numberOfTries)
// Replica configuration

@@ -32,2 +32,11 @@ var replSet = new ReplSetServers( [

var db = new Db('integration_test_', replSet);
// Print any errors
db.on("error", function(err) {
console.log("============================= ensureConnection caught error")
console.dir(err)
if(err != null && err.stack != null) console.log(err.stack)
db.close();
})
// Open the db
db.open(function(err, p_db) {

@@ -52,7 +61,4 @@ if(err != null) {

serversUp = true;
// RS = new ReplicaSetManager({retries:120, arbiter_count:0, passive_count:1});
RS = new ReplicaSetManager({retries:120});
RS = new ReplicaSetManager({retries:120, passive_count:1, arbiter_count:1});
RS.startSet(true, function(err, result) {
if(err != null) throw err;
// Finish setup
callback();

@@ -62,5 +68,4 @@ });

RS.restartKilledNodes(function(err, result) {
if(err != null) throw err;
callback();
})
})
}

@@ -71,7 +76,6 @@ },

RS.restartKilledNodes(function(err, result) {
if(err != null) throw err;
callback();
})
callback();
});
},
shouldCorrectlyWaitForReplicationToServersOnInserts : function(test) {

@@ -102,4 +106,5 @@ // debug("=========================================== shouldWorkCorrectlyWithInserts")

collection.insert({a:20}, {safe: {w:2, wtimeout: 10000}}, function(err, r) {
test.equal(null, err);
test.equal(null, err);
test.done();
p_db.close();
});

@@ -110,4 +115,4 @@ });

},
shouldCorrectlyWaitForReplicationToServersOnInserts : function(test) {
shouldCorrectlyThrowTimeoutForReplicationToServersOnInserts : function(test) {
// debug("=========================================== shouldWorkCorrectlyWithInserts")

@@ -140,2 +145,3 @@ // Replica configuration

test.done();
p_db.close();
});

@@ -176,2 +182,3 @@ });

test.done();
p_db.close();
})

@@ -195,5 +202,20 @@ });

// Insert some data
var db = new Db('integration_test_', replSet);
// Print any errors
db.on("error", function(err) {
console.log("============================= ensureConnection caught error")
console.dir(err)
if(err != null && err.stack != null) console.log(err.stack)
db.close();
})
var first = false;
// Open db
db.open(function(err, p_db) {
if(first) return
first = true
// Check if we got an error

@@ -214,13 +236,22 @@ if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));

// Kill the primary
RS.killPrimary(function(node) {
RS.killPrimary(2, {killNodeWaitTime:10}, function(node) {
// Attempt insert (should fail)
collection.insert({a:30}, {safe: {w:2, wtimeout: 10000}}, function(err, r) {
test.ok(err != null)
// test.done();
if(err != null) {
collection.insert({a:30}, {safe: true}, function(err, r) {
collection.insert({a:40}, {safe: true}, function(err, r) {
// Peform a count
collection.count(function(err, count) {
test.equal(2, count);
p_db.close();
test.done();
});
});
} else {
console.log("+++++++++++++++++++++++++++++++++++++++++++ FAILURE")
p_db.close();
test.ok(false)
}

@@ -249,2 +280,10 @@ });

var db = new Db('integration_test_', replSet);
// Print any errors
db.on("error", function(err) {
console.log("============================= ensureConnection caught error")
console.dir(err)
if(err != null && err.stack != null) console.log(err.stack)
db.close();
})
// Open db
db.open(function(err, p_db) {

@@ -266,3 +305,3 @@ // Check if we got an error

// Kill the primary
RS.killPrimary(function(node) {
RS.killPrimary(2, {killNodeWaitTime:1}, function(node) {
// Ok let's execute same query a couple of times

@@ -272,9 +311,3 @@ collection.find({}).toArray(function(err, items) {

// debug(" 1 =============================== err :: " + inspect(err))
// debug(inspect(items))
collection.find({}).toArray(function(err, items) {
// debug(" 2 =============================== err :: " + inspect(err))
// debug(inspect(items))
test.ok(err == null);

@@ -284,8 +317,5 @@ test.equal(1, items.length);

collection.find({}).toArray(function(err, items) {
// debug(" 2 =============================== err :: " + inspect(err))
// debug(inspect(items))
test.ok(err == null);
test.equal(1, items.length);
p_db.close();
test.done();

@@ -341,3 +371,2 @@ });

RS.killPrimary(function(node) {
// Ensure valid connection

@@ -358,32 +387,29 @@ // Do inserts

var group = this.group();
collection.save({a:30}, {safe:true}, group());
collection.save({a:40}, {safe:true}, group());
collection.save({a:50}, {safe:true}, group());
collection.save({a:60}, {safe:true}, group());
collection.save({a:70}, {safe:true}, group());
collection.save({a:30}, {safe:{w:2, wtimeout: 10000}}, group());
collection.save({a:40}, {safe:{w:2, wtimeout: 10000}}, group());
collection.save({a:50}, {safe:{w:2, wtimeout: 10000}}, group());
collection.save({a:60}, {safe:{w:2, wtimeout: 10000}}, group());
collection.save({a:70}, {safe:{w:2, wtimeout: 10000}}, group());
},
function finishUp(err, values) {
function finishUp(err, values) {
// Restart the old master and wait for the sync to happen
RS.restartKilledNodes(function(err, result) {
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));
if(err != null) throw err;
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));
// Contains the results
var results = [];
// Just wait for the results
setTimeout(function() {
// Ensure the connection
ensureConnection(test, retries, function(err, p_db) {
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));
// Get the collection
p_db.collection('testsets', function(err, collection) {
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));
collection.find().each(function(err, item) {
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));
if(item == null) {

@@ -397,13 +423,13 @@ // Ensure we have the correct values

});
// Run second check
collection.save({a:80}, {safe:true}, function(err, r) {
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));
collection.find().toArray(function(err, items) {
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err));
// Ensure we have the correct values
test.equal(7, items.length);
[20, 30, 40, 50, 60, 70, 80].forEach(function(a) {

@@ -414,3 +440,3 @@ test.equal(1, items.filter(function(element) {

});
p_db.close();

@@ -426,3 +452,3 @@ test.done();

});
}, 1000);
}, 5000);
})

@@ -440,3 +466,9 @@ }

})
}
},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})

@@ -443,0 +475,0 @@

var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false;
var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
gleak = require('../../tools/gleak'),
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager,

@@ -15,3 +16,3 @@ Db = require('../../lib/mongodb').Db,

var retries = 120;
// var RS = null;
var RS = RS == null ? null : RS;

@@ -25,3 +26,3 @@ var ensureConnection = function(test, numberOfTries, callback) {

],
{rs_name:RS.name, read_secondary:true}
{rs_name:RS.name}
);

@@ -31,3 +32,12 @@

var db = new Db('ruby-test-db', replSet);
var db = new Db('integration_test_', replSet);
// Print any errors
db.on("error", function(err) {
console.log("============================= ensureConnection caught error")
console.dir(err)
if(err != null && err.stack != null) console.log(err.stack)
db.close();
})
// Open the db
db.open(function(err, p_db) {

@@ -52,3 +62,3 @@ if(err != null) {

serversUp = true;
RS = new ReplicaSetManager({retries:120});
RS = new ReplicaSetManager({retries:120, passive_count:1, arbiter_count:1});
RS.startSet(true, function(err, result) {

@@ -133,89 +143,34 @@ if(err != null) throw err;

);
// Insert some data
var db = new Db('ruby-test-db', replSet);
var db = new Db('integration_test_', replSet);
db.open(function(err, p_db) {
if(err != null) debug("shouldReadPrimary :: " + inspect(err));
p_db.createCollection("test-sets", {safe:{w:3, wtimeout:10000}}, function(err, collection) {
p_db.createCollection("testsets", {safe:{w:2, wtimeout:10000}}, function(err, collection) {
if(err != null) debug("shouldReadPrimary :: " + inspect(err));
Step(
function inserts() {
var group = this.group();
collection.save({a:20}, group());
collection.save({a:30}, group());
collection.save({a:40}, group());
},
function done(err, values) {
if(err != null) debug("shouldReadPrimary :: " + inspect(err));
var results = [];
retryEnsure(60, function(done) {
results = [];
collection.find().each(function(err, item) {
if(item == null) {
var correct = 0;
// Check all the values
var r = [20, 30, 40];
for(var i = 0; i < r.length; i++) {
correct += results.filter(function(element) {
return element.a == r[i];
}).length;
}
return correct == 3 ? done(true) : done(false);
} else {
results.push(item);
}
});
}, function(err, result) {
if(err != null) debug("shouldReadPrimary :: " + inspect(err));
test.ifError(err);
// Ensure replication happened in time
setTimeout(function() {
// Kill the primary
RS.killPrimary(function(node) {
//
// Retry again to read the docs with primary dead
retryEnsure(60, function(done) {
results = [];
collection.find().each(function(err, item) {
if(err != null) debug("shouldReadPrimary :: " + inspect(err));
if(item == null) {
var correct = 0;
// Check all the values
var r = [20, 30, 40];
for(var i = 0; i < r.length; i++) {
correct += results.filter(function(element) {
return element.a == r[i];
}).length;
}
return correct == 3 ? done(true) : done(false);
} else {
results.push(item);
}
});
}, function(err, result) {
// Check if we get a correct count
collection.count(function(err, count) {
test.ifError(err);
test.equal(3, count)
test.done();
p_db.close();
});
})
});
}, 2000);
})
}
);
collection.insert([{a:20}, {a:30}, {a:40}], {safe:{w:2, wtimeout:10000}}, function(err, result) {
// Ensure replication happened in time
setTimeout(function() {
// Kill the primary
RS.killPrimary(function(node) {
// Do a collection find
collection.find().toArray(function(err, items) {
test.equal(null, err);
test.equal(3, items.length);
test.done();
});
});
}, 2000);
})
});
})
}
},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})

@@ -222,0 +177,0 @@

var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false;
var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
gleak = require('../../tools/gleak'),
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager,

@@ -15,2 +16,3 @@ Db = require('../../lib/mongodb').Db,

var retries = 120;
var RS = RS == null ? null : RS;

@@ -79,3 +81,3 @@ var ensureConnection = function(test, numberOfTries, callback) {

new Server( RS.host, RS.ports[0], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[2], { auto_reconnect: true } )
// new Server( RS.host, RS.ports[2], { auto_reconnect: true } )
],

@@ -99,3 +101,2 @@ {rs_name:RS.name, read_secondary:false}

collection.insert({a:20}, {safe: {w:1, wtimeout: 10000}}, function(err, r) {
// Execute a findAndModify

@@ -112,2 +113,8 @@ collection.findAndModify({'a':20}, [['a', 1]], {'$set':{'b':3}}, {'new':true, safe: {w:7, wtimeout: 10000}}, function(err, updated_doc) {

},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})

@@ -114,0 +121,0 @@

@@ -7,2 +7,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure();

nodeunit = require('../deps/nodeunit'),
gleak = require('../tools/gleak'),
Db = mongodb.Db,

@@ -116,3 +117,9 @@ Cursor = mongodb.Cursor,

});
},
},
noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
test.done();
}
})

@@ -119,0 +126,0 @@

@@ -29,3 +29,3 @@ var debug = require('util').debug,

this.passiveCount = options["passive_count"] != null ? options["passive_count"] : 1;
this.primaryCount = 1;
this.primaryCount = options["primary_count"] != null ? options["primary_count"] : 1;
this.keyPath = [process.cwd(), "test", "tools", "keyfile.txt"].join("/");

@@ -39,15 +39,4 @@ fs.chmodSync(this.keyPath, 0600);

// Keeps all the mongod instances
this.mongods = {};
var self = this;
// Add a handler for errors that bubble up all the way
// process.on('uncaughtException', function (err) {
// debug("============================================================= uncaught Exception")
// debug(inspect(err))
// // Kill all mongod servers and cleanup before exiting
// self.killAll(function() {
// // Force exit
// process.exit();
// })
// });
}

@@ -78,2 +67,3 @@

var members = status["members"];
// Get the correct state memebers

@@ -136,3 +126,2 @@ var nodes = members.filter(function(value) {

// Ensure all the members are up
// process.stdout.write("** Ensuring members are up...");
debug("** Ensuring members are up...");

@@ -153,2 +142,3 @@ // Let's ensure everything is up

var self = this;
var done = false;
// Get master connection

@@ -172,4 +162,8 @@ self.getConnection(function(err, connection) {

} else {
self.numberOfInitiateRetries = 0;
callback(null, null);
// Make sure we only do this once, even if some messages are late
if(!done) {
done = true;
self.numberOfInitiateRetries = 0;
callback(null, null);
}
}

@@ -195,2 +189,5 @@ });

// Set priority off server in config
var priority = typeof fields === 'object' ? fields.priority : null;
// Add extra fields provided

@@ -201,9 +198,4 @@ for(var name in fields) {

// debug("================================================== initNode")
// debug(inspect(this.mongods[n]));
// Perform cleanup of directories
exec("rm -rf " + self.mongods[n]["db_path"], function(err, stdout, stderr) {
// debug("======================================== err1::" + err)
if(err != null) return callback(err, null);

@@ -213,8 +205,3 @@

exec("mkdir -p " + self.mongods[n]["db_path"], function(err, stdout, stderr) {
// debug("======================================== err2::" + err)
if(err != null) return callback(err, null);
// debug("= ======================================= start1::" + self.mongods[n]["start"])
self.mongods[n]["start"] = self.startCmd(n);

@@ -224,5 +211,11 @@ self.start(n, function() {

var member = {"_id": n, "host": self.host + ":" + self.mongods[n]["port"]};
// Set it to arbiter if it's been passed
if(self.mongods[n]['arbiterOnly']) {
member['arbiterOnly'] = true;
}
// Set priority level if it's defined
if(priority != null) {
member['priority'] = priority;
}
// Push member to config
self.config["members"].push(member);

@@ -242,3 +235,3 @@ // Return

ReplicaSetManager.prototype.kill = function(node, signal, callback) {
ReplicaSetManager.prototype.kill = function(node, signal, options, callback) {
var self = this;

@@ -249,2 +242,5 @@ // Unpack callback and variables

signal = args.length ? args.shift() : 2;
options = args.length ? args.shift() : {};
// kill node wait time
var killNodeWaitTime = options.killNodeWaitTime == null ? self.killNodeWaitTime : options.killNodeWaitTime;

@@ -256,6 +252,6 @@ debug("** Killing node with pid " + this.mongods[node]["pid"] + " at port " + this.mongods[node]['port']);

function (error, stdout, stderr) {
console.log('stdout: ' + stdout);
console.log('stderr: ' + stderr);
debug('stdout: ' + stdout);
debug('stderr: ' + stderr);
if (error !== null) {
console.log('exec error: ' + error);
debug('exec error: ' + error);
}

@@ -265,7 +261,7 @@

// Wait for 5 seconds to give the server time to die a proper death
setTimeout(callback, self.killNodeWaitTime);
setTimeout(callback, killNodeWaitTime);
});
}
ReplicaSetManager.prototype.killPrimary = function(signal, callback) {
ReplicaSetManager.prototype.killPrimary = function(signal, options, callback) {
var self = this;

@@ -276,11 +272,17 @@ // Unpack callback and variables

signal = args.length ? args.shift() : 2;
options = args.length ? args.shift() : {};
var done = false;
this.getNodeWithState(1, function(err, node) {
if(err != null) return callback(err, null);
if(!done) {
// Ensure no double callbacks due to later scheduled connections returning
done = true;
if(err != null) return callback(err, null);
// Kill process and return node reference
self.kill(node, signal, function() {
// Wait for a while before passing back
callback(null, node);
})
// Kill process and return node reference
self.kill(node, signal, options, function() {
// Wait for a while before passing back
callback(null, node);
})
}
});

@@ -291,9 +293,14 @@ }

var self = this;
var done = false;
this.getNodeWithState(2, function(err, node) {
if(err != null) return callback(err, null);
// Kill process and return node reference
self.kill(node, function() {
callback(null, node);
})
if(!done) {
// Ensure no double callbacks due to later scheduled connections returning
done = true;
if(err != null) return callback(err, null);
// Kill process and return node reference
self.kill(node, function() {
callback(null, node);
})
}
});

@@ -304,11 +311,15 @@ }

var self = this;
// Get the primary node
this.getNodeWithState(1, function(err, primary) {
// Return error
if(err) return callback(err, null);
if(primary == null) return callback(new Error("No primary found"), null);
// Get the connection for the primary
self.getConnection(primary, function(err, connection) {
// Return any errors
if(err) return callback(err, null);
// Closes the connection so never gets a response
// Execute stepdown process
connection.admin().command({"replSetStepDown": 90});
// Call back
return callback(null, null);
// Return the callback
return callback(null, connection);
});

@@ -352,67 +363,166 @@ });

var self = this;
var numberOfInitiateRetries = this.retries;
var done = false;
// Write out the ensureUp
// process.stdout.write(".");
if(!self.up) process.stdout.write(".");
// Retry check for server up sleeping inbetween
self.retriedConnects = 0;
// Attemp to retrieve a connection
self.getConnection(function(err, connection) {
// If we have an error or no connection object retry
if(err != null || connection == null) {
setTimeout(function() {
self.ensureUpRetries++;
self.ensureUp(callback);
}, 1000)
// Return
return;
// Actual function doing testing
var ensureUpFunction = function() {
if(!done) {
if(!self.up) process.stdout.write(".");
// Attemp to retrieve a connection
self.getConnection(function(err, connection) {
// Adjust the number of retries
numberOfInitiateRetries = numberOfInitiateRetries - 1
// If have no more retries stop
if(numberOfInitiateRetries == 0) {
// Set that we are done
done = true;
// perform callback
return callback(new Er=ror("Servers did not come up again"), null);
}
// We have a connection, execute command and update server object
if(err == null && connection != null) {
// Check repl set get status
connection.admin().command({"replSetGetStatus": 1}, function(err, object) {
/// Get documents
var documents = object.documents;
// Get status object
var status = documents[0];
// If no members set
if(status["members"] == null || err != null) {
// if we have a connection force close it
if(connection != null) connection.close();
// Ensure we perform enough retries
if(self.ensureUpRetries >= self.retries) {
// Set that we are done
done = true;
// if we have a connection force close it
if(connection != null) connection.close();
// Return error
return callback(new Error("Operation Failure"), null);
} else {
// Execute function again
setTimeout(ensureUpFunction, 1000);
}
} else {
// Establish all health member
var healthyMembers = status.members.filter(function(element) {
return element["health"] == 1 && [1, 2, 7].indexOf(element["state"]) != -1
});
var stateCheck = status["members"].filter(function(element, indexOf, array) {
return element["state"] == 1;
});
if(healthyMembers.length == status.members.length && stateCheck.length > 0) {
// Set that we are done
done = true;
// if we have a connection force close it
if(connection != null) connection.close();
// process.stdout.write("all members up! \n\n");
if(!self.up) process.stdout.write("all members up!\n\n")
self.up = true;
return callback(null, status);
} else {
// if we have a connection force close it
if(connection != null) connection.close();
// Ensure we perform enough retries
if(self.ensureUpRetries >= self.retries) {
// Set that we are done
done = true;
// if we have a connection force close it
if(connection != null) connection.close();
// Return error
return callback(new Error("Operation Failure"), null);
} else {
// Execute function again
setTimeout(ensureUpFunction, 1000);
}
}
}
});
}
});
}
// Check repl set get status
connection.admin().command({"replSetGetStatus": 1}, function(err, object) {
/// Get documents
var documents = object.documents;
// Get status object
var status = documents[0];
}
// If no members set
if(status["members"] == null || err != null) {
// Ensure we perform enough retries
if(self.ensureUpRetries < self.retries) {
setTimeout(function() {
self.ensureUpRetries++;
self.ensureUp(callback);
}, 1000)
} else {
return callback(new Error("Operation Failure"), null);
}
} else {
// Establish all health member
var healthyMembers = status.members.filter(function(element) {
return element["health"] == 1 && [1, 2, 7].indexOf(element["state"]) != -1
});
var stateCheck = status["members"].filter(function(element, indexOf, array) {
return element["state"] == 1;
});
if(healthyMembers.length == status.members.length && stateCheck.length > 0) {
// process.stdout.write("all members up! \n\n");
if(!self.up) process.stdout.write("all members up!\n\n")
self.up = true;
return callback(null, status);
} else {
// Ensure we perform enough retries
if(self.ensureUpRetries < self.retries) {
setTimeout(function() {
self.ensureUpRetries++;
self.ensureUp(callback);
}, 1000)
} else {
return callback(new Error("Operation Failure"), null);
}
}
}
});
});
// Execute the first function call
ensureUpFunction();
// // Write out the ensureUp
// // process.stdout.write(".");
// if(!self.up) process.stdout.write(".");
// // Retry check for server up sleeping inbetween
// self.retriedConnects = 0;
// // Attemp to retrieve a connection
// self.getConnection(function(err, connection) {
// // If we have an error or no connection object retry
// if(err != null || connection == null) {
// // if we have a connection force close it
// if(connection != null) connection.close();
// // Retry the connection
// setTimeout(function() {
// self.ensureUpRetries++;
// self.ensureUp(callback);
// }, 1000)
// // Return
// return;
// }
//
// // Check repl set get status
// connection.admin().command({"replSetGetStatus": 1}, function(err, object) {
// /// Get documents
// var documents = object.documents;
// // Get status object
// var status = documents[0];
//
// // If no members set
// if(status["members"] == null || err != null) {
// // if we have a connection force close it
// if(connection != null) connection.close();
// // Ensure we perform enough retries
// if(self.ensureUpRetries < self.retries) {
// setTimeout(function() {
// self.ensureUpRetries++;
// self.ensureUp(callback);
// }, 1000)
// } else {
// // if we have a connection force close it
// if(connection != null) connection.close();
// // Return error
// return callback(new Error("Operation Failure"), null);
// }
// } else {
// // Establish all health member
// var healthyMembers = status.members.filter(function(element) {
// return element["health"] == 1 && [1, 2, 7].indexOf(element["state"]) != -1
// });
//
// var stateCheck = status["members"].filter(function(element, indexOf, array) {
// return element["state"] == 1;
// });
//
// if(healthyMembers.length == status.members.length && stateCheck.length > 0) {
// // if we have a connection force close it
// if(connection != null) connection.close();
// // process.stdout.write("all members up! \n\n");
// if(!self.up) process.stdout.write("all members up!\n\n")
// self.up = true;
// return callback(null, status);
// } else {
// // if we have a connection force close it
// if(connection != null) connection.close();
// // Ensure we perform enough retries
// if(self.ensureUpRetries < self.retries) {
// setTimeout(function() {
// self.ensureUpRetries++;
// self.ensureUp(callback);
// }, 1000)
// } else {
// return callback(new Error("Operation Failure"), null);
// }
// }
// }
// });
// });
}

@@ -428,16 +538,17 @@

Step(
// Start all nodes
function start() {
var group = this.group();
// Start all nodes
for(var i = 0; i < nodes.length; i++) {
self.start(nodes[i], group());
var numberOfNodes = nodes.length;
if(numberOfNodes == 0) return self.ensureUp(callback);
// Restart all the number of nodes
for(var i = 0; i < numberOfNodes; i++) {
// Start the process
self.start(nodes[i], function(err, result) {
// Adjust the number of nodes we are starting
numberOfNodes = numberOfNodes - 1;
if(numberOfNodes === 0) {
self.ensureUp(callback);
}
},
function finished() {
self.ensureUp(callback);
}
)
});
}
}

@@ -447,2 +558,6 @@

var self = this;
// Function done
var done = false;
// Number of retries
var numberOfRetries = self.retries;
// Unpack callback and variables

@@ -453,3 +568,3 @@ var args = Array.prototype.slice.call(arguments, 0);

if(node == null) {
if(node == null) {
var keys = Object.keys(this.mongods);

@@ -465,22 +580,35 @@ for(var i = 0; i < keys.length; i++) {

}
// Fire up the connection to check if we are running
// var db = new Db('node-mongo-blog', new Server(host, port, {}), {native_parser:true});
var connection = new Db("replicaset_test", new Server(this.host, this.mongods[node]["port"], {}));
connection.open(function(err, connection) {
// We need to retry if we have not finished up the number of retries
if(err != null && self.retriedConnects < self.retries) {
// Sleep for a second then retry
setTimeout(function() {
// Update retries
self.retriedConnects++;
// Perform anothe reconnect
self.getConnection(node, callback);
}, 1000)
} else if(err != null && self.retriedConnects >= self.retries){
callback(new Error("Failed to reconnect"), null);
} else {
callback(null, connection);
}
})
// Get the node
if(self.mongods[node] != null) {
var intervalId = setInterval(function() {
var connection = new Db("replicaset_test", new Server(self.host, self.mongods[node]["port"], {}));
connection.open(function(err, db) {
if(err == null && !done) {
// Set done
done = true;
// Clear interval
clearInterval(intervalId);
// Callback as done
return callback(null, connection);
} else {
// Close the connection
if(connection != null) connection.close();
// Adjust the number of retries
numberOfRetries = numberOfRetries - 1;
// If we have no more retries fail
if(numberOfRetries == 0) {
// Set done
done = true;
// Clear interval
clearInterval(intervalId);
// Callback as done
return callback(new Error("Timed out connecting to primary"), null);
}
}
});
}, 1000);
} else {
callback(new Error("no primary node found to do stepDownPrimary"), null);
}
}

@@ -491,15 +619,13 @@

var self = this;
// Start up mongod process
// debug("======================================================================================= starting process")
// debug(self.mongods[node]["start"])
var mongodb = exec(self.mongods[node]["start"],
function (error, stdout, stderr) {
console.log('stdout: ' + stdout);
console.log('stderr: ' + stderr);
debug('stdout: ' + stdout);
debug('stderr: ' + stderr);
if (error !== null) {
console.log('exec error: ' + error);
debug('exec error: ' + error);
}
});
});
// Wait for a half a second then save the pids

@@ -506,0 +632,0 @@ setTimeout(function() {

@@ -37,2 +37,5 @@ var debug = require('util').debug,

db_path: self.db_path, port: self.port, durable: self.durable, auth:self.auth});
// console.log("----------------------------------------------------------------------- start")
// console.log(startCmd)

@@ -122,3 +125,3 @@ exec(killall ? 'killall mongod' : '', function(err, stdout, stderr) {

// Create boot command
var startCmd = "mongod --logpath '" + options['log_path'] + "' " +
var startCmd = "mongod --noprealloc --logpath '" + options['log_path'] + "' " +
" --dbpath " + options['db_path'] + " --port " + options['port'] + " --fork";

@@ -125,0 +128,0 @@ startCmd = options['durable'] ? startCmd + " --dur" : startCmd;

@@ -17,2 +17,3 @@ var nodeunit = require('../deps/nodeunit'),

{dir: __dirname + "/../test/gridstore", path: "/test/gridstore/"},
{dir: __dirname + "/../test/connection", path: "/test/connection/"},
{dir: __dirname + "/../test/bson", path: "/test/bson/"}];

@@ -19,0 +20,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc