Comparing version 0.9.6-23 to 0.9.7-0
0.2.1 / 10-18-2011 | ||
================== | ||
* fixed; package.json dependency versioning | ||
0.2.0 / 10-11-2011 | ||
================== | ||
* added; node v0.5 / v0.6 support | ||
0.1.3 / 09-22-2011 | ||
================== | ||
* use old school node engine format in package.json | ||
0.1.2 / 09-08-2011 | ||
================== | ||
* changed; utilize detectNew in middleware | ||
* updated; docs | ||
0.1.1 / 09-07-2011 | ||
@@ -3,0 +24,0 @@ ================== |
@@ -16,3 +16,3 @@ | ||
exports.version = '0.1.1'; | ||
exports.version = '0.2.1'; | ||
@@ -42,9 +42,6 @@ /** | ||
var known = []; | ||
setTimeout(print, 1000); | ||
function print () { | ||
g.detect().forEach(function (leak) { | ||
if (~known.indexOf(leak)) return; | ||
known.push(leak); | ||
g.detectNew().forEach(function (leak) { | ||
stream.write(format.replace(/%s/, leak) + '\n'); | ||
@@ -84,2 +81,3 @@ }); | ||
// v0.4.x | ||
Gleak.prototype.whitelist = [ | ||
@@ -94,4 +92,24 @@ setTimeout | ||
, global | ||
, GLOBAL | ||
, root | ||
]; | ||
// check for new globals in >= v0.5x | ||
var version = process.version.replace(/^v/, '').split('.'); | ||
if ('0' === version[0] && version[1] > 4) { | ||
Gleak.prototype.whitelist.push( | ||
ArrayBuffer | ||
, Int8Array | ||
, Uint8Array | ||
, Int16Array | ||
, Uint16Array | ||
, Int32Array | ||
, Uint32Array | ||
, Float32Array | ||
, Float64Array | ||
, DataView | ||
, 'errno' // node >= v0.5.x hack | ||
) | ||
} | ||
/** | ||
@@ -98,0 +116,0 @@ * Default format. |
@@ -5,3 +5,3 @@ { | ||
"description": "Node global variable leak detector", | ||
"version": "0.1.1", | ||
"version": "0.2.1", | ||
"repository": { | ||
@@ -16,9 +16,9 @@ "type": "git" | ||
"engines": { | ||
"node": "~v0.4.0" | ||
"node": ">=0.4.0" | ||
}, | ||
"dependencies": {}, | ||
"devDependencies": { | ||
"express": "~v2.0.0" | ||
, "expresso": "v0.7.5" | ||
"express": ">=2.0.0" | ||
, "expresso": "0.7.5" | ||
} | ||
} |
# Gleak | ||
Global variable leak detection for Node.js | ||
var gleak = require('gleak'); | ||
var detector = require('gleak')(); | ||
gleak.detect().forEach(function (name) { | ||
detector.detect().forEach(function (name) { | ||
console.warn('found global leak: %s', name); | ||
@@ -14,2 +14,19 @@ }); | ||
## Detectable | ||
As demonstrated, gleak comes with the `detect` method which returns | ||
an array of all found variable leaks. | ||
Often times we want to run the detector many times, progressively | ||
checking for any new leaks that occurred since we last checked. In | ||
this scenario we can utilize the `detectNew` method. | ||
var detector = require('gleak')(); | ||
x = 1; | ||
detector.detectNew(); // ['x'] | ||
detector.detectNew(); // [] | ||
y = 3; | ||
detector.detectNew(); // ['y'] | ||
## Configurable: | ||
@@ -20,13 +37,31 @@ | ||
var gleak = require('gleak'); | ||
gleak.whitelist.push(app, db); | ||
var gleak = require('gleak')(); | ||
gleak.ignore(app, db); | ||
`gleak.whitelist` is an array that holds all globals we want to ignore. | ||
Push to it or blow it away completely with your own list. | ||
The `gleak.ignore` method allows us to add globals we want to ignore | ||
while safely ignoring duplicates. | ||
`gleak.whitelist` is an array that holds all globals we are ignoring. | ||
You can push to it or blow it away completely with your own list too. | ||
var gleak = require('gleak')(); | ||
gleak.whitelist = [dnode, cluster]; | ||
Changes to your whitelists do not impact any global settings. For example: | ||
var gleak = require('gleak'); | ||
var g1 = gleak(); | ||
var g2 = gleak(); | ||
g1.ignore(myglobal); | ||
g2.whitelist.indexOf(myglobal) === -1; | ||
`g2` does not inherit changes to `g1`s whitelist. | ||
## Printable | ||
If you don't want anything fancy and want to quickly dump all | ||
global leaks to your console, just call `print()`. | ||
var gleak = require('gleak')(); | ||
gleak.print(); // prints "Gleak!: leakedVarName" | ||
@@ -33,0 +68,0 @@ |
@@ -12,3 +12,3 @@ | ||
var sys = require('sys'); | ||
var sys = require('util'); | ||
@@ -15,0 +15,0 @@ /** |
@@ -14,3 +14,3 @@ /*! | ||
fs = require('fs'), | ||
sys = require('sys'), | ||
sys = require('util'), | ||
track = require('../track'), | ||
@@ -17,0 +17,0 @@ path = require('path'); |
@@ -14,3 +14,3 @@ /*! | ||
fs = require('fs'), | ||
sys = require('sys'), | ||
sys = require('util'), | ||
path = require('path'), | ||
@@ -17,0 +17,0 @@ AssertionError = require('assert').AssertionError; |
@@ -20,6 +20,7 @@ Collections | ||
db.createCollection(name, callback) | ||
db.createCollection([[name[, options]], callback) | ||
where `name` is the name of the collection and `callback` is a callback function. `db` is the database object. | ||
where `name` is the name of the collection, options a set of configuration parameters and `callback` is a callback function. `db` is the database object. | ||
The first parameter for | ||
@@ -34,2 +35,7 @@ the callback is the error object (null if no error) and the second one is the pointer to the newly created | ||
## Creating collections options | ||
Several options can be passed to the `createCollection` function with `options` parameter. | ||
* `raw` - driver returns documents as bson binary Buffer objects, `default:false` | ||
### Collection properties | ||
@@ -84,6 +90,11 @@ | ||
db.collection("name", callback); | ||
db.collection([[name[, options]], callback); | ||
If strict mode is off, then a new collection is created if not already present. | ||
## Selecting collections options | ||
Several options can be passed to the `collection` function with `options` parameter. | ||
* `raw` - driver returns documents as bson binary Buffer objects, `default:false` | ||
## Renaming collections | ||
@@ -90,0 +101,0 @@ |
@@ -21,5 +21,11 @@ Database | ||
* `auto_reconnect` - to reconnect automatically, default is false | ||
* `poolSize` - specify the number of connections in the pool, default is 1 | ||
* `auto_reconnect` - to reconnect automatically, `default:false` | ||
* `poolSize` - specify the number of connections in the pool `default:1` | ||
* `retryMiliSeconds` - specify the number of milliseconds between connection attempts `default:5000` | ||
* `numberOfRetries` - specify the number of retries for connection attempts `default:3` | ||
* `reaperInterval` - specify the number of milliseconds between each reaper attempt `default:1000` | ||
* `reaperTimeout` - specify the number of milliseconds for timing out callbacks that don't return `default:30000` | ||
* `raw` - driver expects Buffer raw bson document, `default:false` | ||
## DB options | ||
@@ -70,2 +76,10 @@ | ||
## Sharing the connections over multiple dbs | ||
To share the connection pool across multiple databases you database instance has method `db` | ||
db_connector.db(name) | ||
this returns a new `db` instance that shares the connections off the previous instance but will send all commands to the database `name`. This allows for better control of resource usage in a multiple database scenario. | ||
## Deleting a database | ||
@@ -72,0 +86,0 @@ |
@@ -82,3 +82,3 @@ GridStore | ||
gs.read([size[, offset]], callback) | ||
gs.read([size], callback) | ||
@@ -88,3 +88,2 @@ where | ||
* `size` is the length of the data to be read | ||
* `offset` is the position to start reading | ||
* `callback` is a callback function with two parameters - error object (if an error occured) and data (binary string) | ||
@@ -131,3 +130,3 @@ | ||
* `db` is the database object | ||
* `filename` is the name of the file to be checked | ||
* `filename` is the name of the file to be checked or a regular expression | ||
* `callback` is a callback function with two parameters - an error object (if an error occured) and a boolean value indicating if the file exists or not | ||
@@ -134,0 +133,0 @@ |
@@ -66,2 +66,3 @@ Inserting and updating | ||
* `upsert` - if true and no records match the query, insert `update` as a new record | ||
* `raw` - driver returns updated document as bson binary Buffer, `default:false` | ||
@@ -68,0 +69,0 @@ ### Replacement object |
@@ -20,2 +20,3 @@ Queries | ||
* `options` - defines extra logic (sorting options, paging etc.) | ||
* `raw` - driver returns documents as bson binary Buffer objects, `default:false` | ||
@@ -22,0 +23,0 @@ The result for the query is actually a cursor object. This can be used directly or converted to an array. |
@@ -14,4 +14,7 @@ var sys = require('util'), | ||
DBRef = require('../../lib/mongodb/bson/bson').DBRef, | ||
Symbol = require('../../lib/mongodb/bson/bson').Symbol, | ||
Double = require('../../lib/mongodb/bson/bson').Double, | ||
Timestamp = require('../../lib/mongodb/bson/bson').Timestamp, | ||
assert = require('assert'); | ||
var Long2 = require('./bson').Long, | ||
@@ -21,2 +24,5 @@ ObjectID2 = require('./bson').ObjectID, | ||
Code2 = require('./bson').Code, | ||
Symbol2 = require('./bson').Symbol, | ||
Double2 = require('./bson').Double, | ||
Timestamp2 = require('./bson').Timestamp, | ||
DBRef2 = require('./bson').DBRef; | ||
@@ -30,2 +36,34 @@ | ||
// | ||
// Assert correct toJSON | ||
// | ||
var binary1 = new Binary(new Buffer('00000000000000000000000000000000')); | ||
var binary2 = new Binary2(new Buffer('00000000000000000000000000000000')); | ||
assert.equal(JSON.stringify(binary1), JSON.stringify(binary2)); | ||
var objectId = new ObjectID(); | ||
var dbref1 = new DBRef('test', objectId, 'db'); | ||
var dbref2 = new DBRef2('test', ObjectID2.createFromHexString(objectId.toHexString()), 'db'); | ||
assert.equal(JSON.stringify(dbref1), JSON.stringify(dbref2)); | ||
var symbol1 = new Symbol('hello'); | ||
var symbol2 = new Symbol2('hello'); | ||
assert.equal(JSON.stringify(symbol1), JSON.stringify(symbol2)); | ||
var double1 = new Double(3.232); | ||
var double2 = new Double2(3.232); | ||
assert.equal(JSON.stringify(double1), JSON.stringify(double2)); | ||
var code1 = new Code('hello', {a:1}) | ||
var code2 = new Code2('hello', {a:1}) | ||
assert.equal(JSON.stringify(code1), JSON.stringify(code2)); | ||
var long1 = Long.fromNumber(1000); | ||
var long2 = Long2.fromNumber(1000); | ||
assert.equal(JSON.stringify(long1), JSON.stringify(long2)); | ||
var timestamp1 = Timestamp.fromNumber(1000); | ||
var timestamp2 = Timestamp2.fromNumber(1000); | ||
assert.equal(JSON.stringify(timestamp1), JSON.stringify(timestamp2)); | ||
// Long data type tests | ||
@@ -322,2 +360,16 @@ var l2_string = Long2.fromNumber(100); | ||
// Serialize function | ||
var doc = { | ||
_id: 'testid', | ||
key1: function() {} | ||
} | ||
var simple_string_serialized = BSONJS.serialize(doc, false, true, true); | ||
var simple_string_serialized_2 = BSON.serialize(doc, false, true, true); | ||
// Deserialize the string | ||
var doc1 = BSONJS.deserialize(new Buffer(simple_string_serialized_2)); | ||
var doc2 = BSON.deserialize(new Buffer(simple_string_serialized_2)); | ||
assert.equal(doc1.key1.code.toString(), doc2.key1.code.toString()) | ||
// Force garbage collect | ||
@@ -324,0 +376,0 @@ global.gc(); |
@@ -116,2 +116,6 @@ | ||
Binary.prototype.toJSON = function() { | ||
return this.buffer != null ? this.buffer.toString('base64') : ''; | ||
} | ||
Binary.BUFFER_SIZE = 256; | ||
@@ -118,0 +122,0 @@ |
@@ -72,3 +72,3 @@ /** | ||
// Experiment for performance | ||
BSON.calculateObjectSize = function(object) { | ||
BSON.calculateObjectSize = function(object, serializeFunctions) { | ||
var totalLength = (4 + 1); | ||
@@ -84,2 +84,4 @@ var done = false; | ||
var finished = false; | ||
// Ensure serialized functions set | ||
serializeFunctions = serializeFunctions == null ? false : serializeFunctions; | ||
@@ -129,23 +131,2 @@ while(!done) { | ||
totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (8 + 1); | ||
// // Write the type | ||
// buffer[index++] = BSON.BSON_DATA_NUMBER; | ||
// // Write the name | ||
// if(name != null) { | ||
// index = index + buffer.write(name, index, 'utf8') + 1; | ||
// buffer[index - 1] = 0; | ||
// } | ||
// | ||
// // Write float | ||
// ieee754.writeIEEE754(buffer, value, index, 'little', 52, 8); | ||
// // Ajust index | ||
// index = index + 8; | ||
// } else if(typeof value == 'number' || toString.call(value) === '[object Number]') { | ||
// if(value >= BSON.BSON_INT32_MAX || value < BSON.BSON_INT32_MIN || | ||
// value !== parseInt(value, 10)) { | ||
// // Long and Number take same number of bytes. | ||
// totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (8 + 1); | ||
// } else { | ||
// totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (4 + 1); | ||
// } | ||
} else if(typeof value == 'boolean' || toString.call(value) === '[object Boolean]') { | ||
@@ -210,3 +191,3 @@ totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (1 + 1); | ||
totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + (Buffer.byteLength(value.value, 'utf8') + 4 + 1 + 1); | ||
} else if(typeof value == 'function') { | ||
} else if(typeof value == 'function' && serializeFunctions) { | ||
// Calculate the length of the code string | ||
@@ -249,3 +230,3 @@ totalLength += (name != null ? (Buffer.byteLength(name) + 1) : 0) + 4 + (Buffer.byteLength(value.toString(), 'utf8') + 1 + 1); | ||
// In place serialization with index to starting point of serialization | ||
BSON.serializeWithBufferAndIndex = function serializeWithBufferAndIndex(object, checkKeys, buffer, startIndex) { | ||
BSON.serializeWithBufferAndIndex = function serializeWithBufferAndIndex(object, checkKeys, buffer, startIndex, serializeFunctions) { | ||
if(null != object && 'object' === typeof object) { | ||
@@ -268,2 +249,4 @@ // Encode the object using single allocated buffer and no recursion | ||
var finished = false; | ||
// Ensure serialized functions set | ||
serializeFunctions = serializeFunctions == null ? false : serializeFunctions; | ||
@@ -669,3 +652,3 @@ // Current parsing object state | ||
buffer[index++] = 0; | ||
} else if(typeof value == 'function') { | ||
} else if(typeof value == 'function' && serializeFunctions) { | ||
// Write the type | ||
@@ -798,5 +781,5 @@ buffer[index++] = BSON.BSON_DATA_CODE; | ||
BSON.serialize = function(object, checkKeys, asBuffer) { | ||
var buffer = new Buffer(BSON.calculateObjectSize(object)); | ||
BSON.serializeWithBufferAndIndex(object, checkKeys, buffer, 0); | ||
BSON.serialize = function(object, checkKeys, asBuffer, serializeFunctions) { | ||
var buffer = new Buffer(BSON.calculateObjectSize(object, serializeFunctions)); | ||
BSON.serializeWithBufferAndIndex(object, checkKeys, buffer, 0, serializeFunctions); | ||
return buffer; | ||
@@ -1328,2 +1311,6 @@ } | ||
Code.prototype.toJSON = function() { | ||
return {scope:this.scope, code:this.code}; | ||
} | ||
/** | ||
@@ -1346,2 +1333,6 @@ * Symbol constructor. | ||
Symbol.prototype.toJSON = function() { | ||
return this.value; | ||
} | ||
/** | ||
@@ -1374,7 +1365,7 @@ * MinKey constructor | ||
DBRef.prototype.toJSON = function() { | ||
return JSON.stringify({ | ||
return { | ||
'$ref':this.namespace, | ||
'$id':this.oid, | ||
'$db':this.db == null ? '' : this.db | ||
}); | ||
}; | ||
} | ||
@@ -1381,0 +1372,0 @@ |
@@ -10,1 +10,5 @@ exports.Double = Double; | ||
}; | ||
Double.prototype.toJSON = function() { | ||
return this.value; | ||
} |
@@ -80,2 +80,4 @@ /** | ||
this.slaveOk = options == null || options.slaveOk == null ? db.slaveOk : options.slaveOk; | ||
this.serializeFunctions = options == null || options.serializeFunctions == null ? db.serializeFunctions : options.serializeFunctions; | ||
this.raw = options == null || options.raw == null ? db.raw : options.raw; | ||
@@ -187,3 +189,3 @@ this.pkFactory = pkFactory == null | ||
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection) | ||
this.db.executeCommand(deleteCommand, commandOptions, function (err, error) { | ||
this.db._executeRemoveCommand(deleteCommand, commandOptions, function (err, error) { | ||
error = error && error.documents; | ||
@@ -201,3 +203,3 @@ if(!callback) return; | ||
} else { | ||
var result = this.db.executeCommand(deleteCommand); | ||
var result = this.db._executeRemoveCommand(deleteCommand); | ||
// If no callback just return | ||
@@ -248,3 +250,3 @@ if (!callback) return; | ||
if(!('function' === typeof callback)) callback = null; | ||
// Insert options (flags for insert) | ||
@@ -256,2 +258,10 @@ var insertFlags = {}; | ||
} | ||
// Either use override on the function, or go back to default on either the collection | ||
// level or db | ||
if(options['serializeFunctions'] != null) { | ||
insertFlags['serializeFunctions'] = options['serializeFunctions']; | ||
} else { | ||
insertFlags['serializeFunctions'] = this.serializeFunctions; | ||
} | ||
@@ -266,5 +276,5 @@ // Pass in options | ||
var doc = docs[index]; | ||
// Add id to each document if it's not already defined | ||
if (!doc['_id'] && this.db.forceServerObjectId != true) { | ||
if (!(doc instanceof Buffer) && !doc['_id'] && this.db.forceServerObjectId != true) { | ||
doc['_id'] = this.pkFactory.createPk(); | ||
@@ -283,4 +293,3 @@ } | ||
// Default command options | ||
var commandOptions = {}; | ||
var commandOptions = {}; | ||
// If safe is defined check for error message | ||
@@ -305,3 +314,3 @@ // if(options != null && (options.safe == true || this.db.strict == true || this.opts.safe == true)) { | ||
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection) | ||
this.db.executeCommand(insertCommand, commandOptions, function (err, error) { | ||
this.db._executeInsertCommand(insertCommand, commandOptions, function (err, error) { | ||
error = error && error.documents; | ||
@@ -319,3 +328,3 @@ if(!callback) return; | ||
} else { | ||
var result = this.db.executeCommand(insertCommand, commandOptions); | ||
var result = this.db._executeInsertCommand(insertCommand, commandOptions); | ||
// If no callback just return | ||
@@ -386,2 +395,10 @@ if(!callback) return; | ||
// Either use override on the function, or go back to default on either the collection | ||
// level or db | ||
if(options['serializeFunctions'] != null) { | ||
options['serializeFunctions'] = options['serializeFunctions']; | ||
} else { | ||
options['serializeFunctions'] = this.serializeFunctions; | ||
} | ||
var updateCommand = new UpdateCommand( | ||
@@ -399,3 +416,3 @@ this.db | ||
errorOptions = errorOptions == null && this.db.strict != null ? this.db.strict : errorOptions; | ||
// If we are executing in strict mode or safe both the update and the safe command must happen on the same line | ||
@@ -418,3 +435,3 @@ if(errorOptions && errorOptions != false) { | ||
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection) | ||
this.db.executeCommand(updateCommand, commandOptions, function (err, error) { | ||
this.db._executeUpdateCommand(updateCommand, commandOptions, function (err, error) { | ||
error = error && error.documents; | ||
@@ -432,3 +449,4 @@ if(!callback) return; | ||
} else { | ||
var result = this.db.executeCommand(updateCommand); | ||
// Execute update | ||
var result = this.db._executeUpdateCommand(updateCommand); | ||
// If no callback just return | ||
@@ -463,3 +481,3 @@ if (!callback) return; | ||
this.db.executeCommand(cmd, {read:true}, function (err, result) { | ||
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) { | ||
if (err) { | ||
@@ -509,3 +527,3 @@ return callback(err); | ||
var self = this; | ||
this.db.executeCommand(queryCommand, {read:true}, function (err, result) { | ||
this.db._executeQueryCommand(queryCommand, {read:true}, function (err, result) { | ||
result = result && result.documents; | ||
@@ -573,2 +591,10 @@ if(!callback) return; | ||
} | ||
// Either use override on the function, or go back to default on either the collection | ||
// level or db | ||
if(options['serializeFunctions'] != null) { | ||
options['serializeFunctions'] = options['serializeFunctions']; | ||
} else { | ||
options['serializeFunctions'] = this.serializeFunctions; | ||
} | ||
@@ -582,8 +608,7 @@ // Unpack the error options if any | ||
var connection = this.db.serverConfig.checkoutWriter(); | ||
var rawConnection = connection.getConnection().connection; | ||
// Ensure we execute against the same raw connection so we can get the correct error | ||
// result after the execution of the findAndModify finishes | ||
this.db.executeDbCommand(queryObject, {writer:rawConnection, allReturn:true, safe:errorOptions}, function (err, result) { | ||
result = result && result.documents; | ||
this.db.executeDbCommand(queryObject, {writer:connection, serializeFunctions:options['serializeFunctions']}, function (err, firstResult) { | ||
firstResult = firstResult && firstResult.documents; | ||
if(!callback) return; | ||
@@ -593,6 +618,17 @@ | ||
callback(err); | ||
} else if(!result[0].ok) { | ||
callback(self.db.wrap(result[0])); | ||
} else if(!firstResult[0].ok) { | ||
callback(self.db.wrap(firstResult[0])); | ||
} else { | ||
return callback(null, result[0].value); | ||
// If we have a request for a last error command | ||
if(errorOptions != null && errorOptions != false) { | ||
self.db.lastError(errorOptions, {writer:connection}, function(err, secondResult) { | ||
if(secondResult[0].err != null) { | ||
callback(self.db.wrap(secondResult[0])); | ||
} else { | ||
return callback(null, firstResult[0].value); | ||
} | ||
}) | ||
} else { | ||
return callback(null, firstResult[0].value); | ||
} | ||
} | ||
@@ -636,3 +672,3 @@ }); | ||
// backwards compat for options object | ||
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout','tailable', 'batchSize'] | ||
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout','tailable', 'batchSize', 'raw'] | ||
, is_option = false; | ||
@@ -659,3 +695,32 @@ | ||
if (options && options.fields) { | ||
// Validate correctness off the selector | ||
var object = selector; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
// Validate correctness of the field selector | ||
var object = fields; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
// Check special case where we are using an objectId | ||
if(selector instanceof this.db.bson_serializer.ObjectID) { | ||
selector = {_id:selector}; | ||
} | ||
// If it's a serialized fields field we need to just let it through | ||
// user be warned it better be good | ||
if (options && options.fields && !(options.fields instanceof Buffer)) { | ||
fields = {}; | ||
@@ -676,5 +741,5 @@ if (Array.isArray(options.fields)) { | ||
if (!options) options = {}; | ||
options.skip = len > 3 ? args[2] : options.skip ? options.skip : 0; | ||
options.limit = len > 3 ? args[3] : options.limit ? options.limit : 0; | ||
options.raw = options.raw != null && typeof options.raw === 'boolean' ? options.raw : this.raw; | ||
options.hint = options.hint != null ? this.normalizeHintField(options.hint) : this.internalHint; | ||
@@ -684,11 +749,10 @@ options.timeout = len == 5 ? args[4] : typeof options.timeout === 'undefined' ? undefined : options.timeout; | ||
options.slaveOk = options.slaveOk != null ? options.slaveOk : this.db.slaveOk; | ||
var o = options; | ||
// callback for backward compatibility | ||
if (callback) { | ||
// TODO refactor Cursor args | ||
callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk)); | ||
callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw)); | ||
} else { | ||
return new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk); | ||
return new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw); | ||
} | ||
@@ -738,66 +802,145 @@ }; | ||
*/ | ||
/** | ||
* Various argument possibilities | ||
* TODO : combine/reduce # of possibilities | ||
* 1 callback? | ||
* 2 selector, callback?, | ||
* 2 callback?, options // really?! | ||
* 3 selector, fields, callback? | ||
* 3 selector, options, callback? | ||
* 4,selector, fields, options, callback? | ||
* 5 selector, fields, skip, limit, callback? | ||
* 6 selector, fields, skip, limit, timeout, callback? | ||
* | ||
* Available options: | ||
* limit, sort, fields, skip, hint, explain, snapshot, timeout, tailable, batchSize | ||
*/ | ||
Collection.prototype.findOne = function findOne (queryObject, options, callback) { | ||
Collection.prototype.findOne = function findOne () { | ||
var self = this; | ||
if ('function' === typeof queryObject) { | ||
callback = queryObject; | ||
queryObject = {}; | ||
options = {}; | ||
} else if ('function' === typeof options) { | ||
callback = options; | ||
options = {}; | ||
var options | ||
, args = Array.prototype.slice.call(arguments, 0) | ||
, has_callback = typeof args[args.length - 1] === 'function' | ||
, has_weird_callback = typeof args[0] === 'function' | ||
, callback = has_callback ? args.pop() : (has_weird_callback ? args.shift() : null) | ||
, len = args.length | ||
, selector = len >= 1 ? args[0] : {} | ||
, fields = len >= 2 ? args[1] : undefined; | ||
if (len === 1 && has_weird_callback) { | ||
// backwards compat for callback?, options case | ||
selector = {}; | ||
options = args[0]; | ||
} | ||
var fields; | ||
if (len === 2) { | ||
// backwards compat for options object | ||
var test = ['limit','sort','fields','skip','hint','explain','snapshot','timeout','tailable', 'batchSize', 'raw'] | ||
, is_option = false; | ||
if (options.fields && Array.isArray(options.fields)) { | ||
fields = {}; | ||
if (0 === options.fields.length) { | ||
fields['_id'] = 1; | ||
} else { | ||
for (var i = 0, len = options.fields.length; i < len; ++i) { | ||
fields[options.fields[i]] = 1; | ||
for (var idx = 0, l = test.length; idx < l; ++idx) { | ||
if (test[idx] in fields) { | ||
is_option = true; | ||
break; | ||
} | ||
} | ||
} else { | ||
fields = options.fields; | ||
if (is_option) { | ||
options = fields; | ||
fields = undefined; | ||
} else { | ||
options = {}; | ||
} | ||
} | ||
if (queryObject instanceof this.db.bson_serializer.ObjectID || | ||
'[object ObjectID]' === toString.call(queryObject)) { | ||
queryObject = { '_id': queryObject }; | ||
if (3 === len) { | ||
options = args[2]; | ||
} | ||
var query = { 'query': queryObject }; | ||
// If we have a timeout set | ||
var timeout = null != options.timeout | ||
? options.timeout | ||
: QueryCommand.OPTS_NONE; | ||
// Make sure we can do slaveOk commands | ||
if (options.slaveOk || this.slaveOk || this.db.slaveOk) { | ||
timeout |= QueryCommand.OPTS_SLAVE; | ||
// Validate correctness off the selector | ||
var object = selector; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
// Validate correctness of the field selector | ||
var object = fields; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
// Check special case where we are using an objectId | ||
if(selector instanceof this.db.bson_serializer.ObjectID) { | ||
selector = {_id:selector}; | ||
} | ||
var collectionName = (this.db.databaseName ? this.db.databaseName + '.' : '') | ||
+ this.collectionName; | ||
var queryCommand = new QueryCommand( | ||
this.db | ||
, collectionName | ||
, timeout | ||
, 0 | ||
, 1 | ||
, query | ||
, fields); | ||
this.db.executeCommand(queryCommand, {read:true}, function (err, result) { | ||
if (!err && result.documents[0] && result.documents[0]['$err']) { | ||
return callback(self.db.wrap(result)); | ||
// If it's a serialized fields field we need to just let it through | ||
// user be warned it better be good | ||
if (options && options.fields && !(options.fields instanceof Buffer)) { | ||
fields = {}; | ||
if (Array.isArray(options.fields)) { | ||
if (!options.fields.length) { | ||
fields['_id'] = 1; | ||
} else { | ||
for (var i = 0, l = options.fields.length; i < l; i++) { | ||
fields[options.fields[i]] = 1; | ||
} | ||
} | ||
} else { | ||
fields = options.fields; | ||
} | ||
} | ||
callback(err, result && result.documents && result.documents.length > 0 ? result.documents[0] : null); | ||
if (!options) options = {}; | ||
options.skip = len > 3 ? args[2] : options.skip ? options.skip : 0; | ||
options.limit = len > 3 ? args[3] : options.limit ? options.limit : 0; | ||
options.raw = options.raw != null && typeof options.raw === 'boolean' ? options.raw : this.raw; | ||
options.hint = options.hint != null ? this.normalizeHintField(options.hint) : this.internalHint; | ||
options.timeout = len == 5 ? args[4] : typeof options.timeout === 'undefined' ? undefined : options.timeout; | ||
// If we have overridden slaveOk otherwise use the default db setting | ||
options.slaveOk = options.slaveOk != null ? options.slaveOk : this.db.slaveOk; | ||
// Create cursor instance | ||
var o = options; | ||
var cursor = new Cursor(this.db, this, selector, fields, o.skip, 1, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw); | ||
cursor.toArray(function(err, items) { | ||
if(err != null) return callback(err instanceof Error ? err : self.db.wrap(new Error(err)), null); | ||
if(items.length == 1) return callback(null, items[0]); | ||
callback(null, null); | ||
}); | ||
// // callback for backward compatibility | ||
// if (callback) { | ||
// // TODO refactor Cursor args | ||
// callback(null, new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw)); | ||
// } else { | ||
// return new Cursor(this.db, this, selector, fields, o.skip, o.limit, o.sort, o.hint, o.explain, o.snapshot, o.timeout, o.tailable, o.batchSize, o.slaveOk, o.raw); | ||
// } | ||
// var self = this; | ||
// // Get all the arguments | ||
// var args = Array.prototype.slice.call(arguments, 0); | ||
// // Retrieve the callback | ||
// var callback = args.pop(); | ||
// try { | ||
// // Call find function with one item | ||
// this.find.apply(this, args).limit(1).toArray(function(err, items) { | ||
// if(err != null) return callback(err instanceof Error ? err : self.db.wrap(new Error(err)), null); | ||
// if(items.length == 1) return callback(null, items[0]); | ||
// callback(null, null); | ||
// }); | ||
// | ||
// }catch(err) { | ||
// console.log("----------------------------------- 444444444444444444444444444444444") | ||
// console.dir(err) | ||
// } | ||
}; | ||
@@ -922,3 +1065,3 @@ | ||
this.db.executeCommand(cmd, {read:true}, function (err, result) { | ||
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) { | ||
if (err) { | ||
@@ -1040,3 +1183,3 @@ return callback(err); | ||
this.db.executeCommand(cmd, {read:true}, function (err, result) { | ||
this.db._executeQueryCommand(cmd, {read:true}, function (err, result) { | ||
if (err) return callback(err); | ||
@@ -1043,0 +1186,0 @@ |
@@ -17,2 +17,7 @@ var BinaryParser = require('../bson/binary_parser').BinaryParser, | ||
BaseCommand.prototype.updateRequestId = function() { | ||
this.requestId = id++; | ||
return this.requestId; | ||
}; | ||
// OpCodes | ||
@@ -19,0 +24,0 @@ BaseCommand.OP_REPLY = 1; |
@@ -11,5 +11,4 @@ var QueryCommand = require('./query_command').QueryCommand, | ||
**/ | ||
var DbCommand = exports.DbCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector) { | ||
QueryCommand.call(db, this); | ||
var DbCommand = exports.DbCommand = function(dbInstance, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector, options) { | ||
QueryCommand.call(this); | ||
this.collectionName = collectionName; | ||
@@ -21,3 +20,10 @@ this.queryOptions = queryOptions; | ||
this.returnFieldSelector = returnFieldSelector; | ||
this.db = db; | ||
this.db = dbInstance; | ||
// Make sure we don't get a null exception | ||
options = options == null ? {} : options; | ||
// Let us defined on a command basis if we want functions to be serialized or not | ||
if(options['serializeFunctions'] != null && options['serializeFunctions']) { | ||
this.serializeFunctions = true; | ||
} | ||
}; | ||
@@ -34,2 +40,7 @@ | ||
// New commands | ||
DbCommand.NcreateIsMasterCommand = function(db, databaseName) { | ||
return new DbCommand(db, databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, {'ismaster':1}, null); | ||
}; | ||
// Provide constructors for different db commands | ||
@@ -185,4 +196,4 @@ DbCommand.createIsMasterCommand = function(db) { | ||
DbCommand.createDbCommand = function(db, command_hash) { | ||
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null); | ||
DbCommand.createDbCommand = function(db, command_hash, options) { | ||
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, command_hash, null, options); | ||
}; | ||
@@ -189,0 +200,0 @@ |
@@ -11,2 +11,13 @@ var BaseCommand = require('./base_command').BaseCommand, | ||
BaseCommand.call(this); | ||
// Validate correctness off the selector | ||
var object = selector; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("delete raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
@@ -79,5 +90,16 @@ this.collectionName = collectionName; | ||
_command[_index++] = 0; | ||
// Document binary length | ||
var documentLength = 0 | ||
// Serialize the selector | ||
// If we are passing a raw buffer, do minimal validation | ||
if(this.selector instanceof Buffer) { | ||
documentLength = this.selector.length; | ||
// Copy the data into the current buffer | ||
this.selector.copy(_command, _index); | ||
} else { | ||
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.selector, this.checkKeys, _command, _index) - _index + 1; | ||
} | ||
// Serialize the selector | ||
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.selector, this.checkKeys, _command, _index) - _index + 1; | ||
// Write the length to the document | ||
@@ -84,0 +106,0 @@ _command[_index + 3] = (documentLength >> 24) & 0xff; |
@@ -24,4 +24,2 @@ var BaseCommand = require('./base_command').BaseCommand, | ||
GetMoreCommand.prototype.toBinary = function() { | ||
// debug("======================================================= GETMORE") | ||
// debug("================ " + this.db.bson_serializer.BSON.calculateObjectSize(this.query)) | ||
// Calculate total length of the document | ||
@@ -28,0 +26,0 @@ var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 8 + (4 * 4); |
@@ -17,2 +17,3 @@ var BaseCommand = require('./base_command').BaseCommand, | ||
this.flags = 0; | ||
this.serializeFunctions = false; | ||
@@ -27,2 +28,7 @@ // Ensure valid options hash | ||
} | ||
// Let us defined on a command basis if we want functions to be serialized or not | ||
if(options['serializeFunctions'] != null && options['serializeFunctions']) { | ||
this.serializeFunctions = true; | ||
} | ||
}; | ||
@@ -36,2 +42,11 @@ | ||
InsertCommand.prototype.add = function(document) { | ||
if(document instanceof Buffer) { | ||
var object_size = document[0] | document[1] << 8 | document[2] << 16 | document[3] << 24; | ||
if(object_size != document.length) { | ||
var error = new Error("insert raw message size does not match message header size [" + document.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
this.documents.push(document); | ||
@@ -54,4 +69,8 @@ return this; | ||
for(var i = 0; i < this.documents.length; i++) { | ||
// Calculate size of document | ||
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.documents[i]); | ||
if(this.documents[i] instanceof Buffer) { | ||
totalLengthOfCommand += this.documents[i].length; | ||
} else { | ||
// Calculate size of document | ||
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.documents[i], this.serializeFunctions); | ||
} | ||
} | ||
@@ -98,7 +117,20 @@ | ||
_command[_index - 1] = 0; | ||
// Write all the bson documents to the buffer at the index offset | ||
for(var i = 0; i < this.documents.length; i++) { | ||
// Serialize the document straight to the buffer | ||
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.documents[i], this.checkKeys, _command, _index) - _index + 1; | ||
// Document binary length | ||
var documentLength = 0 | ||
var object = this.documents[i]; | ||
// Serialize the selector | ||
// If we are passing a raw buffer, do minimal validation | ||
if(object instanceof Buffer) { | ||
documentLength = object.length; | ||
// Copy the data into the current buffer | ||
object.copy(_command, _index); | ||
} else { | ||
// Serialize the document straight to the buffer | ||
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1; | ||
} | ||
// Write the length to the document | ||
@@ -105,0 +137,0 @@ _command[_index + 3] = (documentLength >> 24) & 0xff; |
@@ -10,5 +10,29 @@ var BaseCommand = require('./base_command').BaseCommand, | ||
**/ | ||
var QueryCommand = exports.QueryCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector) { | ||
var QueryCommand = exports.QueryCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector, options) { | ||
BaseCommand.call(this); | ||
// Validate correctness off the selector | ||
var object = query; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("query selector raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
var object = returnFieldSelector; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("query fields raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
// Make sure we don't get a null exception | ||
options = options == null ? {} : options; | ||
// Set up options | ||
this.collectionName = collectionName; | ||
@@ -21,2 +45,7 @@ this.queryOptions = queryOptions; | ||
this.db = db; | ||
// Let us defined on a command basis if we want functions to be serialized or not | ||
if(options['serializeFunctions'] != null && options['serializeFunctions']) { | ||
this.serializeFunctions = true; | ||
} | ||
}; | ||
@@ -40,12 +69,17 @@ | ||
QueryCommand.prototype.toBinary = function() { | ||
// debug("======================================================= QUERY") | ||
// debug("================ " + this.db.bson_serializer.BSON.calculateObjectSize(this.query)) | ||
var totalLengthOfCommand = 0; | ||
// Calculate total length of the document | ||
if(this.query instanceof Buffer) { | ||
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.query.length + (4 * 4); | ||
} else { | ||
totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.query, this.serializeFunctions) + (4 * 4); | ||
} | ||
// Calculate total length of the document | ||
var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.query) + (4 * 4); | ||
// Calculate extra fields size | ||
if(this.returnFieldSelector != null) { | ||
if(this.returnFieldSelector != null && !(this.returnFieldSelector instanceof Buffer)) { | ||
if(Object.keys(this.returnFieldSelector).length > 0) { | ||
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.returnFieldSelector); | ||
totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.returnFieldSelector, this.serializeFunctions); | ||
} | ||
} else if(this.returnFieldSelector instanceof Buffer) { | ||
totalLengthOfCommand += this.returnFieldSelector.length; | ||
} | ||
@@ -110,7 +144,17 @@ | ||
_index = _index + 4; | ||
// Document binary length | ||
var documentLength = 0 | ||
var object = this.query; | ||
// Serialize the selector | ||
if(object instanceof Buffer) { | ||
documentLength = object.length; | ||
// Copy the data into the current buffer | ||
object.copy(_command, _index); | ||
} else { | ||
// Serialize the document straight to the buffer | ||
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1; | ||
} | ||
// Serialize the query document straight to the buffer | ||
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.query, this.checkKeys, _command, _index) - _index + 1; | ||
// debug(inspect("===================== documentLength :: " + documentLength)) | ||
// Write the length to the document | ||
@@ -127,5 +171,5 @@ _command[_index + 3] = (documentLength >> 24) & 0xff; | ||
// Push field selector if available | ||
if(this.returnFieldSelector != null) { | ||
if(this.returnFieldSelector != null && !(this.returnFieldSelector instanceof Buffer)) { | ||
if(Object.keys(this.returnFieldSelector).length > 0) { | ||
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.returnFieldSelector, this.checkKeys, _command, _index) - _index + 1; | ||
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.returnFieldSelector, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1; | ||
// Write the length to the document | ||
@@ -141,7 +185,24 @@ _command[_index + 3] = (documentLength >> 24) & 0xff; | ||
} | ||
} if(this.returnFieldSelector != null && this.returnFieldSelector instanceof Buffer) { | ||
// Document binary length | ||
var documentLength = 0 | ||
var object = this.returnFieldSelector; | ||
// Serialize the selector | ||
documentLength = object.length; | ||
// Copy the data into the current buffer | ||
object.copy(_command, _index); | ||
// Write the length to the document | ||
_command[_index + 3] = (documentLength >> 24) & 0xff; | ||
_command[_index + 2] = (documentLength >> 16) & 0xff; | ||
_command[_index + 1] = (documentLength >> 8) & 0xff; | ||
_command[_index] = documentLength & 0xff; | ||
// Update index in buffer | ||
_index = _index + documentLength; | ||
// Add terminating 0 for the object | ||
_command[_index - 1] = 0; | ||
} | ||
// debug("------------------------------------------------------------------------") | ||
// debug(inspect(_command)) | ||
// Return finished command | ||
return _command; | ||
@@ -148,0 +209,0 @@ }; |
@@ -12,2 +12,22 @@ var BaseCommand = require('./base_command').BaseCommand, | ||
var object = spec; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("update spec raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
var object = document; | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) { | ||
var error = new Error("update document raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
error.name = 'MongoError'; | ||
throw error; | ||
} | ||
} | ||
this.collectionName = collectionName; | ||
@@ -17,2 +37,3 @@ this.spec = spec; | ||
this.db = db; | ||
this.serializeFunctions = false; | ||
@@ -27,2 +48,6 @@ // Generate correct flags | ||
this.flags = parseInt(db_multi_update.toString() + db_upsert.toString(), 2); | ||
// Let us defined on a command basis if we want functions to be serialized or not | ||
if(options['serializeFunctions'] != null && options['serializeFunctions']) { | ||
this.serializeFunctions = true; | ||
} | ||
}; | ||
@@ -46,4 +71,4 @@ | ||
// Calculate total length of the document | ||
var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.spec) + | ||
this.db.bson_serializer.BSON.calculateObjectSize(this.document) + (4 * 4); | ||
var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.spec, false) + | ||
this.db.bson_serializer.BSON.calculateObjectSize(this.document, this.serializeFunctions) + (4 * 4); | ||
@@ -98,4 +123,18 @@ // Let's build the single pass buffer command | ||
// Serialize the spec document | ||
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.spec, this.checkKeys, _command, _index) - _index + 1; | ||
// Document binary length | ||
var documentLength = 0 | ||
var object = this.spec; | ||
// Serialize the selector | ||
// If we are passing a raw buffer, do minimal validation | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) throw new Error("raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
documentLength = object.length; | ||
// Copy the data into the current buffer | ||
object.copy(_command, _index); | ||
} else { | ||
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, false) - _index + 1; | ||
} | ||
// Write the length to the document | ||
@@ -111,4 +150,18 @@ _command[_index + 3] = (documentLength >> 24) & 0xff; | ||
// Document binary length | ||
var documentLength = 0 | ||
var object = this.document; | ||
// Serialize the document | ||
var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.document, this.checkKeys, _command, _index) - _index + 1; | ||
// If we are passing a raw buffer, do minimal validation | ||
if(object instanceof Buffer) { | ||
var object_size = object[0] | object[1] << 8 | object[2] << 16 | object[3] << 24; | ||
if(object_size != object.length) throw new Error("raw message size does not match message header size [" + object.length + "] != [" + object_size + "]"); | ||
documentLength = object.length; | ||
// Copy the data into the current buffer | ||
object.copy(_command, _index); | ||
} else { | ||
documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(object, this.checkKeys, _command, _index, this.serializeFunctions) - _index + 1; | ||
} | ||
// Write the length to the document | ||
@@ -115,0 +168,0 @@ _command[_index + 3] = (documentLength >> 24) & 0xff; |
var utils = require('./connection_utils'), | ||
inherits = require('util').inherits, | ||
net = require('net'), | ||
EventEmitter = require("events").EventEmitter, | ||
EventEmitter = require('events').EventEmitter, | ||
inherits = require('util').inherits, | ||
MongoReply = require("../responses/mongo_reply").MongoReply, | ||
Connection = require("./connection").Connection; | ||
var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, socketOptions) { | ||
if(typeof host !== 'string' || typeof port !== 'number') throw "host and port must be specified"; | ||
var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bson, socketOptions) { | ||
if(typeof host !== 'string' || typeof port !== 'number') throw "host and port must be specified [" + host + ":" + port + "]"; | ||
// Set up event emitter | ||
EventEmitter.call(this); | ||
// Keep all options for the socket in a specific collection allowing the user to specify the | ||
@@ -15,2 +19,3 @@ // Wished upon socket connection parameters | ||
this.socketOptions.poolSize = poolSize; | ||
this.bson = bson; | ||
@@ -39,29 +44,76 @@ // Set host variable or default | ||
this.openConnections = {}; | ||
// Assign connection id's | ||
this.connectionId = 0; | ||
// Just keeps list of events we allow | ||
// this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[]}; | ||
// Current connection to pick | ||
this.currentConnectionIndex = 0; | ||
// The pool state | ||
this._poolState = 'not connected'; | ||
} | ||
// Inherit event emitter so we can emit stuff wohoo | ||
inherits(ConnectionPool, EventEmitter); | ||
ConnectionPool.prototype.setMaxBsonSize = function(maxBsonSize) { | ||
var keys = Object.keys(this.openConnections); | ||
if ( typeof maxBsonSize == 'undefined' ){ | ||
maxBsonSize = Connection.DEFAULT_MAX_BSON_SIZE; | ||
} | ||
for(var i = 0; i < keys.length; i++) { | ||
this.openConnections[keys[i]].maxBsonSize = maxBsonSize; | ||
} | ||
} | ||
// Creates handlers | ||
var connectHandler = function(self) { | ||
return function(err) { | ||
// this references the connection object | ||
if(err) { | ||
return function(err, connection) { | ||
// Ensure we don't fire same error message multiple times | ||
var fireError = true; | ||
var performedOperation = false; | ||
// if we have an error and we have not already put the connection into the list of connections with errors | ||
if(err && Object.keys(self.waitingToOpen).length > 0 && self.openConnections[connection.id] == null && self.connectionsWithErrors[connection.id] == null) { | ||
// Add to list of error connections | ||
self.connectionsWithErrors[this.id] = this; | ||
self.connectionsWithErrors[connection.id] = connection; | ||
// Remove from list of waiting to connect | ||
delete self.waitingToOpen[this.id]; | ||
// Emit error so rest of code knows | ||
self.emit("error", err, this); | ||
} else { | ||
delete self.waitingToOpen[connection.id]; | ||
// Ensure we only fire an error if there was an operation to avoid duplicate errors | ||
performedOperation = true; | ||
} else if(err && self.openConnections[connection.id] != null){ | ||
// Add to list of error connections | ||
self.connectionsWithErrors[connection.id] = connection; | ||
// Remove from list of open connections | ||
delete self.openConnections[connection.id]; | ||
// Ensure we only fire an error if there was an operation to avoid duplicate errors | ||
performedOperation = true; | ||
} else if(!err && self.waitingToOpen[connection.id] != null){ | ||
// Remove from list of waiting to connect | ||
delete self.waitingToOpen[this.id]; | ||
delete self.waitingToOpen[connection.id]; | ||
// Add to list of open connections | ||
self.openConnections[this.id] = this; | ||
} | ||
self.openConnections[connection.id] = connection; | ||
// Ensure we only fire an error if there was an operation to avoid duplicate errors | ||
performedOperation = true; | ||
} else { | ||
fireError = false; | ||
} | ||
// Check if we are done meaning that the number of openconnections + errorconnections | ||
if(Object.keys(self.waitingToOpen).length == 0) { | ||
// Emit pool is ready | ||
self.emit("poolReady"); | ||
if(Object.keys(self.waitingToOpen).length == 0 && performedOperation) { | ||
// If we have any errors notify the application, only fire if we don't have the element already in | ||
// errors | ||
if(Object.keys(self.connectionsWithErrors).length > 0 && fireError) { | ||
// Set pool type to not connected | ||
self._poolState = 'not connected'; | ||
// Emit error | ||
self.emit("error", err, connection); | ||
} else { | ||
// Set pool state to connecting | ||
self._poolState = 'connected'; | ||
// Emit pool is ready | ||
self.emit("poolReady"); | ||
} | ||
} | ||
@@ -72,3 +124,7 @@ } | ||
// Start method, will throw error if no listeners are available | ||
// Pass in an instance of the listener that contains the api for | ||
// finding callbacks for a given message etc. | ||
ConnectionPool.prototype.start = function() { | ||
var self = this; | ||
if(this.listeners("poolReady").length == 0) { | ||
@@ -78,10 +134,29 @@ throw "pool must have at least one listener ready that responds to the [poolReady] event"; | ||
// Set pool state to connecting | ||
this._poolState = 'connecting'; | ||
// Let's boot up all the instances | ||
for(var i = 0; i < this.socketOptions.poolSize; i++) { | ||
for(var i = 0; i < this.socketOptions.poolSize; i++) { | ||
// Create a new connection instance | ||
var connection = new Connection(i, this.socketOptions); | ||
var connection = new Connection(this.connectionId++, this.socketOptions); | ||
// Add connection to list of waiting connections | ||
this.waitingToOpen[connection.id] = connection; | ||
// Add a connection handler | ||
this.waitingToOpen[connection.id] = connection; | ||
connection.on("connect", connectHandler(this)); | ||
connection.on("error", connectHandler(this)); | ||
connection.on("close", connectHandler(this)); | ||
connection.on("end", connectHandler(this)); | ||
connection.on("timeout", connectHandler(this)); | ||
connection.on("parseError", function(err) { | ||
// Set pool type to not connected | ||
self._poolState = 'not connected'; | ||
// Only close the connection if it's still connected | ||
if(self.isConnected()) self.stop(); | ||
// Emit the error | ||
self.emit("parseError", err); | ||
}); | ||
connection.on("message", function(message) { | ||
self.emit("message", message); | ||
}); | ||
// Start connection | ||
@@ -92,11 +167,71 @@ connection.start(); | ||
// Restart a connection pool (on a close the pool might be in a wrong state) | ||
ConnectionPool.prototype.restart = function() { | ||
// Close all connections | ||
this.stop(); | ||
// Now restart the pool | ||
this.start(); | ||
} | ||
// Stop the connections in the pool | ||
ConnectionPool.prototype.stop = function() { | ||
// Set not connected | ||
this._poolState = 'not connected'; | ||
// Get all open connections | ||
var keys = Object.keys(this.openConnections); | ||
// Force close all open sockets | ||
for(var i = 0; i < keys.length; i++) { | ||
this.openConnections[keys[i]].close(); | ||
} | ||
// Get all error connections | ||
var keys = Object.keys(this.connectionsWithErrors); | ||
// Force close all error sockets | ||
for(var i = 0; i < keys.length; i++) { | ||
this.connectionsWithErrors[keys[i]].close(); | ||
} | ||
// Get all waiting to open connections | ||
var keys = Object.keys(this.waitingToOpen); | ||
// Force close all waiting sockets | ||
for(var i = 0; i < keys.length; i++) { | ||
this.waitingToOpen[keys[i]].close(); | ||
} | ||
// Clear out all the connection variables | ||
this.waitingToOpen = {}; | ||
this.connectionsWithErrors = {}; | ||
this.openConnections = {}; | ||
// Emit a close event so people can track the event | ||
this.emit("close"); | ||
} | ||
// Check the status of the connection | ||
ConnectionPool.prototype.isConnected = function() { | ||
return Object.keys(this.waitingToOpen).length == 0 | ||
&& Object.keys(this.connectionsWithErrors).length == 0 | ||
&& Object.keys(this.openConnections).length > 0 && this._poolState === 'connected' | ||
&& this.openConnections[Object.keys(this.openConnections)[0]].isConnected(); | ||
} | ||
// Checkout a connection from the pool for usage, or grab a specific pool instance | ||
ConnectionPool.prototype.checkoutConnection = function(id) { | ||
// If we have an id return that specific connection | ||
if(id != null) return this.openConnections[id]; | ||
// Otherwise let's pick one using roundrobin | ||
var keys = Object.keys(this.openConnections); | ||
return this.openConnections[(keys[(this.currentConnectionIndex++ % keys.length)])] | ||
} | ||
ConnectionPool.prototype.getAllConnections = function() { | ||
return this.openConnections; | ||
} | ||
// Remove all non-needed event listeners | ||
ConnectionPool.prototype.removeAllEventListeners = function() { | ||
this.removeAllListeners("close"); | ||
this.removeAllListeners("error"); | ||
} | ||
@@ -117,1 +252,9 @@ | ||
var utils = require('./connection_utils'), | ||
inherits = require('util').inherits, | ||
net = require('net'), | ||
binary_utils = require('../bson/binary_utils'), | ||
EventEmitter = require("events").EventEmitter; | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
EventEmitter = require('events').EventEmitter, | ||
inherits = require('util').inherits, | ||
binaryutils = require('../bson/binary_utils'); | ||
var Connection = exports.Connection = function(id, socketOptions) { | ||
// Store all socket options | ||
this.socketOptions = socketOptions; | ||
this.socketOptions = socketOptions ? socketOptions : {host:'localhost', port:27017}; | ||
// Id for the connection | ||
@@ -19,2 +22,3 @@ this.id = id; | ||
this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : global.DEFAULT_MAX_BSON_SIZE; | ||
// Contains the current message bytes | ||
@@ -28,4 +32,10 @@ this.buffer = null; | ||
this.stubBuffer = 0; | ||
// Just keeps list of events we allow | ||
resetHandlers(this, false); | ||
} | ||
// Set max bson size | ||
Connection.DEFAULT_MAX_BSON_SIZE = 4 * 1024 * 1024 * 4 * 3; | ||
// Inherit event emitter so we can emit stuff wohoo | ||
@@ -35,4 +45,4 @@ inherits(Connection, EventEmitter); | ||
Connection.prototype.start = function() { | ||
console.dir(this.socketOptions) | ||
// Set up event emitter | ||
EventEmitter.call(this); | ||
// Create new connection instance | ||
@@ -42,4 +52,4 @@ this.connection = net.createConnection(this.socketOptions.port, this.socketOptions.host); | ||
this.connection.on("connect", connectHandler(this)); | ||
this.connection.on("data", dataHandler(this)); | ||
this.connection.on("end", endHandler(this)); | ||
this.connection.on("data", createDataHandler(this)); | ||
// this.connection.on("end", endHandler(this)); | ||
this.connection.on("timeout", timeoutHandler(this)); | ||
@@ -51,2 +61,45 @@ this.connection.on("drain", drainHandler(this)); | ||
// Check if the sockets are live | ||
Connection.prototype.isConnected = function() { | ||
return this.connected; | ||
} | ||
// Write the data out to the socket | ||
Connection.prototype.write = function(command, callback) { | ||
try { | ||
// If we have a list off commands to be executed on the same socket | ||
if(Array.isArray(command)) { | ||
for(var i = 0; i < command.length; i++) { | ||
var t = this.connection.write(command[i].toBinary()); | ||
} | ||
} else { | ||
var r = this.connection.write(command.toBinary()); | ||
} | ||
} catch (err) { | ||
if(typeof callback === 'function') callback(err); | ||
} | ||
} | ||
// Force the closure of the connection | ||
Connection.prototype.close = function() { | ||
// clear out all the listeners | ||
resetHandlers(this, true); | ||
// destroy connection | ||
this.connection.destroy(); | ||
} | ||
// Reset all handlers | ||
var resetHandlers = function(self, clearListeners) { | ||
self.eventHandlers = {error:[], connect:[], close:[], end:[], timeout:[], parseError:[], message:[]}; | ||
// If we want to clear all the listeners | ||
if(clearListeners) { | ||
var keys = Object.keys(self.eventHandlers); | ||
// Remove all listeners | ||
for(var i = 0; i < keys.length; i++) { | ||
self.connection.removeAllListeners(keys[i]); | ||
} | ||
} | ||
} | ||
// | ||
@@ -60,3 +113,3 @@ // Handlers | ||
// Set options on the socket | ||
this.setEncoding(self.socketOptions.encoding); | ||
// this.setEncoding(self.socketOptions.encoding); | ||
this.setTimeout(self.socketOptions.timeout); | ||
@@ -72,11 +125,10 @@ this.setNoDelay(self.socketOptions.noDelay); | ||
// Emit the connect event with no error | ||
self.emit("connect", null); | ||
self.emit("connect", null, self); | ||
} | ||
} | ||
var dataHandler = function(self) { | ||
var createDataHandler = exports.Connection.createDataHandler = function(self) { | ||
// We need to handle the parsing of the data | ||
// and emit the messages when there is a complete one | ||
return function(data) { | ||
// Parse until we are done with the data | ||
@@ -88,17 +140,16 @@ while(data.length > 0) { | ||
var remainingBytesToRead = self.sizeOfMessage - self.bytesRead; | ||
// Check if the current chunk contains the rest of the message | ||
if(remainingBytesToRead > data.length) { | ||
// Copy the new data into the exiting buffer (should have been allocated when we know the message size) | ||
data.copy(self.buffer, self.bytesRead); | ||
// Adjust the number of bytes read so it point to the correct index in the buffer | ||
self.bytesRead = self.bytesRead + remainingBytesToRead; | ||
self.bytesRead = self.bytesRead + data.length; | ||
// Reset state of buffer | ||
data = new Buffer(0); | ||
} else { | ||
// Copy the missing part of the data into our current buffer | ||
data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead); | ||
// Slice the overflow into a new buffer that we will then re-parse | ||
data = data.slice(self.sizeOfMessage); | ||
data = data.slice(remainingBytesToRead); | ||
@@ -108,9 +159,16 @@ // Emit current complete message | ||
self.emit("message", self.buffer); | ||
} finally { | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
} catch(err) { | ||
// We got a parse Error fire it off then keep going | ||
self.emit("parseError", {err:"socketHandler", trace:err, bin:buffer, parseState:{ | ||
sizeOfMessage:self.sizeOfMessage, | ||
bytesRead:self.bytesRead, | ||
stubBuffer:self.stubBuffer}}); | ||
} | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
} | ||
@@ -124,20 +182,15 @@ } else { | ||
if(self.stubBuffer.length + data.length > 4) { | ||
// Prepad the data | ||
var newData = new Buffer(self.stubBuffer.length + data.length); | ||
self.stubBuffer.copy(newData, 0); | ||
data.copy(newData, self.stubBuffer.length); | ||
// Reassign for parsing | ||
data = newData; | ||
// Create temp buffer to keep the 4 bytes for the size | ||
var messageSizeBuffer = new Buffer(4); | ||
// Copy in the stubBuffer data | ||
self.stubBuffer.copy(messageSizeBuffer, 0); | ||
// Copy the remaining (4 - stubBuffer.length) bytes needed to determine the size | ||
data.copy(messageSizeBuffer, self.stubBuffer.length, 0, (4 - self.stubBuffer.length)) | ||
// Determine the message Size | ||
self.sizeOfMessage = binary_utils.decodeUInt32(messageSizeBuffer, 0) | ||
// Do a single allocation for the buffer | ||
self.buffer = new Buffer(self.sizeOfMessage); | ||
// Set bytes read | ||
self.bytesRead = 4; | ||
// Slice the data so we can keep parsing | ||
data = data.slice((4 - self.stubBuffer.length)); | ||
// Null out stub buffer | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
} else { | ||
@@ -155,11 +208,9 @@ | ||
} else { | ||
if(data.length > 4) { | ||
// Retrieve the message size | ||
var sizeOfMessage = binaryutils.decodeUInt32(data, 0); | ||
// Write the data into the buffer | ||
if(sizeOfMessage > data.length) { | ||
self.buffer = new Buffer(sizeOfMessage); | ||
// Ensure that the size of message is larger than 0 and less than the max allowed | ||
if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage > data.length) { | ||
self.buffer = new Buffer(sizeOfMessage); | ||
// Copy all the data into the buffer | ||
@@ -176,6 +227,5 @@ data.copy(self.buffer, 0); | ||
} else if(sizeOfMessage == data.length) { | ||
} else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage == data.length) { | ||
try { | ||
emit("message", data); | ||
} finally { | ||
self.emit("message", data); | ||
// Reset state of buffer | ||
@@ -186,8 +236,41 @@ self.buffer = null; | ||
self.stubBuffer = null; | ||
} | ||
} | ||
// Exit parsing loop | ||
data = new Buffer(0); | ||
} catch (err) { | ||
// We got a parse Error fire it off then keep going | ||
self.emit("parseError", {err:"socketHandler", trace:err, bin:self.buffer, parseState:{ | ||
sizeOfMessage:self.sizeOfMessage, | ||
bytesRead:self.bytesRead, | ||
stubBuffer:self.stubBuffer}}); | ||
} | ||
} else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonSize) { | ||
// We got a parse Error fire it off then keep going | ||
self.emit("parseError", {err:"socketHandler", trace:null, bin:data, parseState:{ | ||
sizeOfMessage:sizeOfMessage, | ||
bytesRead:0, | ||
buffer:null, | ||
stubBuffer:null}}); | ||
// Clear out the state of the parser | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
// Exit parsing loop | ||
data = new Buffer(0); | ||
} else { | ||
self.emit("message", data.slice(0, sizeOfMessage)); | ||
// Reset state of buffer | ||
self.buffer = null; | ||
self.sizeOfMessage = 0; | ||
self.bytesRead = 0; | ||
self.stubBuffer = null; | ||
// Copy rest of message | ||
data = data.slice(sizeOfMessage); | ||
} | ||
} else { | ||
// Create a buffer that contains the space for the non-complete message | ||
@@ -203,5 +286,2 @@ self.stubBuffer = new Buffer(data.length) | ||
} | ||
console.log("========================= data"); | ||
} | ||
@@ -212,3 +292,6 @@ } | ||
return function() { | ||
console.log("========================= end"); | ||
// Set connected to false | ||
self.connected = false; | ||
// Emit end event | ||
self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
} | ||
@@ -219,3 +302,3 @@ } | ||
return function() { | ||
console.log("========================= timeout"); | ||
self.emit("end", {err: 'connection to [' + self.socketOptions.host + ':' + self.socketOptions.port + '] timed out'}, self); | ||
} | ||
@@ -226,3 +309,2 @@ } | ||
return function() { | ||
console.log("========================= drain"); | ||
} | ||
@@ -233,3 +315,6 @@ } | ||
return function(err) { | ||
console.log("========================= error"); | ||
// Set connected to false | ||
self.connected = false; | ||
// Emit error | ||
self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
} | ||
@@ -240,13 +325,15 @@ } | ||
return function(hadError) { | ||
// Set connected to false | ||
self.connected = false; | ||
// If we have an error during the connection phase | ||
if(hadError && !self.connected) { | ||
self.emit("connect", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
if(hadError && !self.connected) { | ||
self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
} else { | ||
self.emit("close", {err: 'connection closed to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self); | ||
} | ||
console.log("========================= close"); | ||
} | ||
} | ||
// Some basic defaults | ||
Connection.DEFAULT_PORT = 27017; | ||
@@ -253,0 +340,0 @@ |
@@ -43,3 +43,3 @@ var QueryCommand = require('./commands/query_command').QueryCommand, | ||
*/ | ||
var Cursor = exports.Cursor = function(db, collection, selector, fields, skip, limit, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk) { | ||
var Cursor = exports.Cursor = function(db, collection, selector, fields, skip, limit, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk, raw) { | ||
this.db = db; | ||
@@ -59,2 +59,3 @@ this.collection = collection; | ||
this.slaveOk = slaveOk == null ? collection.slaveOk : slaveOk; | ||
this.raw = raw == null ? false : raw; | ||
@@ -126,3 +127,3 @@ this.totalNumberOfRecords = 0; | ||
this.each(function(err, item) { | ||
this.each(function(err, item) { | ||
if(err != null) return callback(err, null); | ||
@@ -135,2 +136,3 @@ | ||
items = null; | ||
self.items = []; | ||
} | ||
@@ -394,3 +396,2 @@ }); | ||
if(this.snapshot != null) specialSelector['$snapshot'] = true; | ||
return new QueryCommand(this.db, this.collectionName, queryOptions, this.skipValue, numberToReturn, specialSelector, this.fields); | ||
@@ -486,3 +487,3 @@ } else { | ||
self.db.executeCommand(cmd, {read:true}, commandHandler); | ||
self.db._executeQueryCommand(cmd, {read:true, raw:self.raw}, commandHandler); | ||
commandHandler = null; | ||
@@ -521,3 +522,3 @@ } else if(self.items.length) { | ||
// Execute the command | ||
self.db.executeCommand(getMoreCommand, {read:true}, function(err, result) { | ||
self.db._executeQueryCommand(getMoreCommand, {read:true, raw:self.raw}, function(err, result) { | ||
try { | ||
@@ -605,3 +606,3 @@ if(err != null) callback(err, null); | ||
function execute(command) { | ||
self.db.executeCommand(command, {read:true}, function(err,result) { | ||
self.db._executeQueryCommand(command, {read:true, raw:self.raw}, function(err,result) { | ||
if(err) { | ||
@@ -666,14 +667,14 @@ stream.emit('error', err); | ||
var command = new KillCursorCommand(this.db, [this.cursorId]); | ||
this.db.executeCommand(command, {read:true}, null); | ||
this.db._executeQueryCommand(command, {read:true, raw:self.raw}, null); | ||
} catch(err) {} | ||
} | ||
// Reset cursor id | ||
this.cursorId = self.db.bson_serializer.Long.fromInt(0); | ||
// Set to closed status | ||
this.state = Cursor.CLOSED; | ||
if(callback) { | ||
process.nextTick(function(){ | ||
callback(null, self); | ||
self.items = null; | ||
}); | ||
callback(null, self); | ||
self.items = []; | ||
} | ||
@@ -680,0 +681,0 @@ |
@@ -6,6 +6,5 @@ var QueryCommand = require('./commands/query_command').QueryCommand, | ||
Admin = require('./admin').Admin, | ||
Connection = require('./connection').Connection, | ||
Collection = require('./collection').Collection, | ||
Server = require('./connections/server').Server, | ||
ReplSetServers = require('./connections/repl_set_servers').ReplSetServers, | ||
Server = require('./connection/server').Server, | ||
ReplSetServers = require('./connection/repl_set_servers').ReplSetServers, | ||
Cursor = require('./cursor').Cursor, | ||
@@ -48,2 +47,3 @@ EventEmitter = require('events').EventEmitter, | ||
this.auths = []; | ||
this.logger = this.options.logger != null | ||
@@ -61,25 +61,76 @@ && (typeof this.options.logger.debug == 'function') | ||
// Add a listener for the reconnect event | ||
this.on("reconnect", function() { | ||
// Number of current auths | ||
var authLength = self.auths.length; | ||
var numberOfReadyAuth = 0; | ||
this.tag = new Date().getTime(); | ||
// Contains the callbacks | ||
this._mongodbHandlers = {_mongodbCallbacks : {}, _notReplied : {}}; | ||
// Just keeps list of events we allow | ||
this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[]}; | ||
if(authLength > 0) { | ||
// If we have any auths fire off the auth message to all the connections | ||
for(var i = 0; i < authLength; i++) { | ||
// Execute auth commands | ||
self.authenticate(self.auths[i].username, self.auths[i].password, function(err, result) { | ||
numberOfReadyAuth = numberOfReadyAuth + 1; | ||
// Controls serialization options | ||
this.serializeFunctions = this.options.serializeFunctions != null ? this.options.serializeFunctions : false; | ||
// Raw mode | ||
this.raw = this.options.raw != null ? this.options.raw : false; | ||
// Retry information | ||
this.retryMiliSeconds = this.options.retryMiliSeconds != null ? this.options.retryMiliSeconds : 5000; | ||
this.numberOfRetries = this.options.numberOfRetries != null ? this.options.numberOfRetries : 5; | ||
// Reaper information | ||
this.reaperInterval = this.options.reaperInterval != null ? this.options.reaperInterval : 1000; | ||
this.reaperTimeout = this.options.reaperTimeout != null ? this.options.reaperTimeout : 30000; | ||
// Start reaper, cleans up timed out calls | ||
this.reaperIntervalId = setInterval(reaper(this, this.reaperTimeout), this.reaperInterval); | ||
}; | ||
if(numberOfReadyAuth == self.auths.length) { | ||
self.serverConfig.emit("resend"); | ||
} | ||
}); | ||
} | ||
} else { | ||
self.serverConfig.emit("resend"); | ||
// Does forced cleanup of callbacks if a call never returns | ||
var reaper = function(dbInstance, timeout) { | ||
return function() { | ||
// Only trigger reaper if it's still valid and we have a proper connection pool | ||
if(dbInstance.reaperIntervalId != null) { | ||
// Current time | ||
var currentTime = new Date().getTime(); | ||
// If it's no longer connected clear out the interval | ||
if(dbInstance.serverConfig.connectionPool != null && !dbInstance.serverConfig.isConnected() && dbInstance.reaperIntervalId != null) { | ||
// Clear the interval | ||
clearInterval(dbInstance.reaperIntervalId); | ||
// this._mongodbHandlers = {_mongodbCallbacks : {}, _notReplied : {}}; | ||
// Trigger all remaining callbacks with timeout error | ||
if(dbInstance._mongodbHandlers != null && dbInstance._mongodbHandlers._notReplied != null) { | ||
var keys = Object.keys(dbInstance._mongodbHandlers._notReplied); | ||
// Iterate over all callbacks | ||
for(var i = 0; i < keys.length; i++) { | ||
// Get callback | ||
var callback = dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]]; | ||
// Cleanup | ||
delete dbInstance._mongodbHandlers._notReplied[keys[i]]; | ||
delete dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]]; | ||
// Perform callback | ||
callback(new Error("operation timed out"), null); | ||
} | ||
} | ||
} else { | ||
// Trigger all callbacks that went over timeout period | ||
if(dbInstance._mongodbHandlers != null && dbInstance._mongodbHandlers._notReplied != null) { | ||
var keys = Object.keys(dbInstance._mongodbHandlers._notReplied); | ||
// Iterate over all callbacks | ||
for(var i = 0; i < keys.length; i++) { | ||
// Get info element | ||
var info = dbInstance._mongodbHandlers._notReplied[keys[i]]; | ||
// If it's timed out let's remove the callback and return an error | ||
if((currentTime - info.start) > timeout) { | ||
// Get callback | ||
var callback = dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]]; | ||
// Cleanup | ||
delete dbInstance._mongodbHandlers._notReplied[keys[i]]; | ||
delete dbInstance._mongodbHandlers._mongodbCallbacks[keys[i]]; | ||
// Perform callback | ||
callback(new Error("operation timed out"), null); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
}; | ||
} | ||
} | ||
@@ -102,8 +153,12 @@ function validateDatabaseName(databaseName) { | ||
if(self.serverConfig instanceof Server || self.serverConfig instanceof ReplSetServers) { | ||
self.serverConfig.connect(self, function(err, result) { | ||
if(err != null) return callback(err, null); | ||
self.serverConfig.connect(self, {firstCall: true}, function(err, result) { | ||
if(err != null) { | ||
// Clear reaper interval | ||
if(self.reaperIntervalId != null) clearInterval(self.reaperIntervalId); | ||
// Return error from connection | ||
return callback(err, null); | ||
} | ||
// Callback | ||
return callback(null, self); | ||
}); | ||
} else { | ||
@@ -114,7 +169,27 @@ return callback(Error("Server parameter must be of type Server or ReplSetServers"), null); | ||
Db.prototype.close = function(callback) { | ||
Db.prototype.db = function(dbName) { | ||
// Create a new db instance | ||
var newDbInstance = new Db(dbName, this.serverConfig, this.options); | ||
// Add the instance to the list of approved db instances | ||
var allServerInstances = this.serverConfig.allServerInstances(); | ||
// Add ourselves to all server callback instances | ||
for(var i = 0; i < allServerInstances.length; i++) { | ||
var server = allServerInstances[i]; | ||
server.dbInstances.push(newDbInstance); | ||
} | ||
// Return new db object | ||
return newDbInstance; | ||
} | ||
Db.prototype.close = function(callback) { | ||
// Clear reaperId if it's set | ||
if(this.reaperIntervalId != null) { | ||
clearInterval(this.reaperIntervalId); | ||
} | ||
// Remove all listeners and close the connection | ||
this.serverConfig.removeAllListeners("reconnect"); | ||
this.serverConfig.close(callback); | ||
// Emit the close event | ||
if(typeof callback !== 'function') this.emit("close"); | ||
// Remove all listeners | ||
this.removeAllEventListeners(); | ||
// Clear out state of the connection | ||
@@ -276,27 +351,12 @@ this.state = "notConnected"; | ||
var logoutCommand = DbCommand.logoutCommand(self, {logout:1, socket:options['socket']}); | ||
// For all the connections let's execute the command | ||
var rawConnections = self.serverConfig.allRawConnections(); | ||
var numberOfExpectedReturns = rawConnections.length; | ||
for(var index = 0; index < numberOfExpectedReturns; index++) { | ||
// Execute the logout on all raw connections | ||
self.executeCommand(logoutCommand, {writer: rawConnections[index].connection}, function(err, result) { | ||
// Ajust the number of expected results | ||
numberOfExpectedReturns = numberOfExpectedReturns - 1; | ||
// If we are done let's evaluate | ||
if(numberOfExpectedReturns <= 0) { | ||
// Reset auth | ||
self.auths = []; | ||
// Handle any errors | ||
if(err == null && result.documents[0].ok == 1) { | ||
callback(null, true); | ||
} else { | ||
err != null ? callback(err, false) : callback(new Error(result.documents[0].errmsg), false); | ||
} | ||
} | ||
}); | ||
} | ||
self._executeQueryCommand(logoutCommand, {onAll:true}, function(err, result) { | ||
// Reset auth | ||
self.auths = []; | ||
// Handle any errors | ||
if(err == null && result.documents[0].ok == 1) { | ||
callback(null, true); | ||
} else { | ||
err != null ? callback(err, false) : callback(new Error(result.documents[0].errmsg), false); | ||
} | ||
}); | ||
} | ||
@@ -309,45 +369,40 @@ | ||
var self = this; | ||
// Add the auth details to the connection for reconnects or update existing if any | ||
var found = false; | ||
for(var i = 0; i < self.auths.length; i++) { | ||
// If we have found an existing auth, update the password | ||
if(self.auths[i].username == username) { | ||
found = true; | ||
self.auths[i].password = password; | ||
} | ||
} | ||
// Push the new auth if we have no previous record | ||
if(!found) self.auths.push({'username':username, 'password':password}); | ||
// Figure out the number of times we need to trigger | ||
var rawConnections = self.serverConfig.allRawConnections(); | ||
var numberOfExpectedReturns = rawConnections.length; | ||
// Execute the commands | ||
for(var i = 0; i < numberOfExpectedReturns; i++) { | ||
// Execute command | ||
var createNonceCallback = function(index) { | ||
return function(err, reply) { | ||
if(err == null) { | ||
// Nonce used to make authentication request with md5 hash | ||
var nonce = reply.documents[0].nonce; | ||
// Execute command | ||
self.executeCommand(DbCommand.createAuthenticationCommand(self, username, password, nonce), {writer: rawConnections[index].connection}, function(err, result) { | ||
// Ajust the number of expected results | ||
numberOfExpectedReturns = numberOfExpectedReturns - 1; | ||
// If we are done let's evaluate | ||
if(numberOfExpectedReturns <= 0) { | ||
if(err == null && result.documents[0].ok == 1) { | ||
callback(null, true); | ||
} else { | ||
err != null ? callback(err, false) : callback(new Error(result.documents[0].errmsg), false); | ||
} | ||
} | ||
}); | ||
} else { | ||
callback(err, null); | ||
self.auths = [{'username':username, 'password':password}]; | ||
// Get the amount of connections in the pool to ensure we have authenticated all comments | ||
var numberOfConnections = Object.keys(this.serverConfig.allRawConnections()).length; | ||
var errorObject = null; | ||
// Execute all four | ||
this._executeQueryCommand(DbCommand.createGetNonceCommand(self), {onAll:true}, function(err, result, connection) { | ||
// Execute on all the connections | ||
if(err == null) { | ||
// Nonce used to make authentication request with md5 hash | ||
var nonce = result.documents[0].nonce; | ||
// Execute command | ||
self._executeQueryCommand(DbCommand.createAuthenticationCommand(self, username, password, nonce), {connection:connection}, function(err, result) { | ||
// Ensure we save any error | ||
if(err) { | ||
errorObject = err; | ||
} else if(result.documents[0].err != null || result.documents[0].errmsg != null){ | ||
errorObject = self.wrap(result.documents[0]); | ||
} | ||
} | ||
} | ||
this.executeCommand(DbCommand.createGetNonceCommand(self), {writer: rawConnections[i].connection}, createNonceCallback(i)); | ||
} | ||
// Count down | ||
numberOfConnections = numberOfConnections - 1; | ||
// If we are done with the callbacks return | ||
if(numberOfConnections <= 0) { | ||
if(errorObject == null && result.documents[0].ok == 1) { | ||
callback(errorObject, true); | ||
} else { | ||
callback(errorObject, false); | ||
} | ||
} | ||
}); | ||
} else { | ||
} | ||
}); | ||
}; | ||
@@ -413,3 +468,3 @@ | ||
try { | ||
var collection = new Collection(self, collectionName, self.pkFactory); | ||
var collection = new Collection(self, collectionName, self.pkFactory, options); | ||
} catch(err) { | ||
@@ -420,8 +475,8 @@ return callback(err, null); | ||
} | ||
// Create a new collection and return it | ||
self.executeCommand(DbCommand.createCreateCollectionCommand(self, collectionName, options), {read:false, safe:true}, function(err, result) { | ||
self._executeQueryCommand(DbCommand.createCreateCollectionCommand(self, collectionName, options), {read:false, safe:true}, function(err, result) { | ||
if(err == null && result.documents[0].ok == 1) { | ||
try { | ||
var collection = new Collection(self, collectionName, self.pkFactory); | ||
var collection = new Collection(self, collectionName, self.pkFactory, options); | ||
} catch(err) { | ||
@@ -447,3 +502,3 @@ return callback(err, null); | ||
Db.prototype.dropCollection = function(collectionName, callback) { | ||
this.executeCommand(DbCommand.createDropCollectionCommand(this, collectionName), function(err, result) { | ||
this._executeQueryCommand(DbCommand.createDropCollectionCommand(this, collectionName), function(err, result) { | ||
if(err == null && result.documents[0].ok == 1) { | ||
@@ -461,3 +516,3 @@ if(callback != null) return callback(null, true); | ||
Db.prototype.renameCollection = function(fromCollection, toCollection, callback) { | ||
this.executeCommand(DbCommand.createRenameCollectionCommand(this, fromCollection, toCollection), function(err, doc) { callback(err, doc); }); | ||
this._executeQueryCommand(DbCommand.createRenameCollectionCommand(this, fromCollection, toCollection), function(err, doc) { callback(err, doc); }); | ||
}; | ||
@@ -475,3 +530,3 @@ | ||
this.executeCommand(DbCommand.createGetLastErrorCommand(options, this), connectionOptions, function(err, error) { | ||
this._executeQueryCommand(DbCommand.createGetLastErrorCommand(options, this), connectionOptions, function(err, error) { | ||
callback(err, error && error.documents); | ||
@@ -494,3 +549,3 @@ }); | ||
Db.prototype.lastStatus = function(callback) { | ||
this.executeCommand(DbCommand.createGetLastStatusCommand(this), callback); | ||
this._executeQueryCommand(DbCommand.createGetLastStatusCommand(this), callback); | ||
}; | ||
@@ -502,3 +557,3 @@ | ||
Db.prototype.previousErrors = function(callback) { | ||
this.executeCommand(DbCommand.createGetPreviousErrorsCommand(this), function(err, error) { | ||
this._executeQueryCommand(DbCommand.createGetPreviousErrorsCommand(this), function(err, error) { | ||
callback(err, error.documents); | ||
@@ -513,3 +568,3 @@ }); | ||
if(callback == null) { callback = options; options = {}; } | ||
this.executeCommand(DbCommand.createDbCommand(this, command_hash), options, callback); | ||
this._executeQueryCommand(DbCommand.createDbCommand(this, command_hash, options), options, callback); | ||
}; | ||
@@ -521,3 +576,3 @@ | ||
Db.prototype.executeDbAdminCommand = function(command_hash, callback) { | ||
this.executeCommand(DbCommand.createAdminDbCommand(this, command_hash), callback); | ||
this._executeQueryCommand(DbCommand.createAdminDbCommand(this, command_hash), callback); | ||
}; | ||
@@ -529,3 +584,3 @@ | ||
Db.prototype.resetErrorHistory = function(callback) { | ||
this.executeCommand(DbCommand.createResetErrorHistoryCommand(this), callback); | ||
this._executeQueryCommand(DbCommand.createResetErrorHistoryCommand(this), callback); | ||
}; | ||
@@ -539,3 +594,3 @@ | ||
var command = DbCommand.createCreateIndexCommand(this, collectionName, fieldOrSpec, options); | ||
this.executeCommand(command, {read:false, safe:true}, function(err, result) { | ||
this._executeInsertCommand(command, {read:false, safe:true}, function(err, result) { | ||
if(err != null) return callback(err, null); | ||
@@ -565,3 +620,3 @@ | ||
if(!collectionInfo[index_name]) { | ||
self.executeCommand(command, {read:false, safe:true}, function(err, result) { | ||
self._executeInsertCommand(command, {read:false, safe:true}, function(err, result) { | ||
if(err != null) return callback(err, null); | ||
@@ -586,3 +641,3 @@ | ||
Db.prototype.cursorInfo = function(callback) { | ||
this.executeCommand(DbCommand.createDbCommand(this, {'cursorInfo':1}), function(err, result) { | ||
this._executeQueryCommand(DbCommand.createDbCommand(this, {'cursorInfo':1}), function(err, result) { | ||
callback(err, result.documents[0]); | ||
@@ -596,3 +651,3 @@ }); | ||
Db.prototype.dropIndex = function(collectionName, indexName, callback) { | ||
this.executeCommand(DbCommand.createDropIndexCommand(this, collectionName, indexName), callback); | ||
this._executeQueryCommand(DbCommand.createDropIndexCommand(this, collectionName, indexName), callback); | ||
}; | ||
@@ -642,3 +697,3 @@ | ||
Db.prototype.dropDatabase = function(callback) { | ||
this.executeCommand(DbCommand.createDropDatabaseCommand(this), function(err, result) { | ||
this._executeQueryCommand(DbCommand.createDropDatabaseCommand(this), function(err, result) { | ||
callback(err, result); | ||
@@ -648,175 +703,255 @@ }); | ||
/** | ||
Execute db command | ||
**/ | ||
Db.prototype.executeCommand = function(db_command, options, callback) { | ||
var self = this; | ||
// Register a handler | ||
Db.prototype._registerHandler = function(db_command, raw, connection, callback) { | ||
// Add the callback to the list of handlers | ||
this._mongodbHandlers._mongodbCallbacks[db_command.getRequestId().toString()] = callback; | ||
// Add the information about the reply | ||
this._mongodbHandlers._notReplied[db_command.getRequestId().toString()] = {start: new Date().getTime(), 'raw': raw, 'connection':connection}; | ||
} | ||
// Remove a handler | ||
Db.prototype._removeHandler = function(db_command) { | ||
var id = typeof db_command === 'number' || typeof db_command === 'string' ? db_command.toString() : db_command.getRequestId().toString(); | ||
// Ensure we have an entry (might have been removed by the reaper) | ||
if(this._mongodbHandlers != null && this._mongodbHandlers._mongodbCallbacks[id] != null) { | ||
var callback = this._mongodbHandlers._mongodbCallbacks[id]; | ||
var info = this._mongodbHandlers._notReplied[id]; | ||
// Remove the handler | ||
delete this._mongodbHandlers._mongodbCallbacks[id]; | ||
delete this._mongodbHandlers._notReplied[id] | ||
// Return the callback | ||
return {'callback':callback, 'info':info}; | ||
} else { | ||
return null; | ||
} | ||
} | ||
Db.prototype._findHandler = function(id) { | ||
var callback = this._mongodbHandlers._mongodbCallbacks[id.toString()]; | ||
var info = this._mongodbHandlers._notReplied[id.toString()]; | ||
// Return the callback | ||
return {'callback':callback, 'info':info}; | ||
} | ||
var __executeQueryCommand = function(self, db_command, options, callback) { | ||
// Options unpacking | ||
var read = options['read'] != null ? options['read'] : false; | ||
var raw = options['raw'] != null ? options['raw'] : self.raw; | ||
var onAll = options['onAll'] != null ? options['onAll'] : false; | ||
var specifiedConnection = options['connection'] != null ? options['connection'] : null; | ||
// If we got a callback object | ||
if(callback instanceof Function && !onAll) { | ||
// Let's the out outgoing request Id | ||
var requestId = db_command.getRequestId().toString(); | ||
// Fetch either a reader or writer dependent on the specified read option | ||
var connection = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter(); | ||
// Override connection if needed | ||
connection = specifiedConnection != null ? specifiedConnection : connection; | ||
// Ensure we have a valid connection | ||
if(connection == null) return callback(new Error("no open connections")); | ||
// Register the handler in the data structure | ||
self._registerHandler(db_command, raw, connection, callback); | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
callback = args.pop(); | ||
options = args.length ? args.shift() : {}; | ||
// Write the message out and handle any errors if there are any | ||
connection.write(db_command, function(err) { | ||
if(err != null) { | ||
// Clean up listener and return error | ||
var callbackInstance = self._removeHandler(db_command); | ||
// Only call if the reaper has not removed it | ||
if(callbackInstance != null) { | ||
callbackInstance.callback(err); | ||
} | ||
} | ||
}); | ||
} else if(callback instanceof Function && onAll) { | ||
var connections = self.serverConfig.allRawConnections(); | ||
var keys = Object.keys(connections); | ||
var numberOfEntries = keys.length; | ||
// Options unpacking | ||
var read = options['read'] != null ? options['read'] : false; | ||
var safe = options['safe'] != null ? options['safe'] : false; | ||
// Signals both commands return a value | ||
var allReturn = options['allReturn'] != null ? options['allReturn'] : false; | ||
// Let's us pass in a writer to force the use of a connection (used for admin where we need to peform 2 calls against the same connection) | ||
var rawConnection = options['writer'] != null ? options['writer'] : null; | ||
// Go through all the connections | ||
for(var i = 0; i < keys.length; i++) { | ||
// Fetch a connection | ||
var connection = connections[keys[i]]; | ||
// Override connection if needed | ||
connection = specifiedConnection != null ? specifiedConnection : connection; | ||
// Ensure we have a valid connection | ||
if(connection == null) return callback(new Error("no open connections")); | ||
var errorCommand = null; | ||
if(safe) { | ||
errorCommand = DbCommand.createGetLastErrorCommand(safe, this); | ||
} | ||
// Contains the command we need to listen to | ||
var listenToCommand = null; | ||
// If we have a callback execute | ||
if(callback instanceof Function) { | ||
listenToCommand = errorCommand != null ? errorCommand : db_command; | ||
// If we need to return all the results let's do it | ||
if(allReturn && safe != null && safe != false) { | ||
// Handle the 2 callbacks ensuring we trap all errors | ||
var handleBothCommandsFunction = function() { | ||
// Number of expected results before a callback | ||
var numberOfResultsToBeReturned = 2; | ||
var results = []; | ||
// Handle the callbacks | ||
return function(err, result) { | ||
// Adjust number of expected results | ||
numberOfResultsToBeReturned = numberOfResultsToBeReturned - 1; | ||
// Add results and error | ||
results.push({err:err, result:result}); | ||
// Return if we got all the responses | ||
if(numberOfResultsToBeReturned <= 0) { | ||
// Detect if there was an error | ||
var errorIndex = 0; | ||
// Find the first error if any | ||
for(var i = 0; i < results.length; i++) { | ||
// Check for error | ||
if(results[i].err != null || results[i].result.documents[0].err != null) { | ||
errorIndex = i; | ||
break; | ||
} | ||
} | ||
// If the first command fails return that and ignore the next command result | ||
if(errorIndex > 0) { | ||
return callback(results[errorIndex].result.documents[0], results[errorIndex].result); | ||
} else if(results[0].err || !results[0].result.ok) { | ||
return callback(results[0].err, results[0].result); | ||
} else { | ||
// First command went ok, return second command results | ||
return callback(results[1].err, results[1].result); | ||
} | ||
} | ||
} | ||
}(); | ||
// Register the handler in the data structure | ||
self._registerHandler(db_command, raw, connection, callback); | ||
// Write the message out | ||
connection.write(db_command, function(err) { | ||
// Adjust the number of entries we need to process | ||
numberOfEntries = numberOfEntries - 1; | ||
// Remove listener | ||
if(err != null) { | ||
// Clean up listener and return error | ||
self._removeHandler(db_command); | ||
} | ||
// Fire actual command | ||
this.on(db_command.getRequestId().toString(), handleBothCommandsFunction); | ||
// Fire the error command | ||
this.on(errorCommand.getRequestId().toString(), handleBothCommandsFunction); | ||
} else { | ||
// Add the callback to the list of callbacks by the request id (mapping outgoing messages to correct callbacks) | ||
this.on(listenToCommand.getRequestId().toString(), callback); | ||
if(self.serverConfig.primary != null) { | ||
this.notReplied[listenToCommand.getRequestId().toString()] = new Date().getTime(); | ||
} | ||
} | ||
// No more entries to process callback with the error | ||
if(numberOfEntries <= 0) { | ||
callback(err); | ||
} | ||
}); | ||
// Update the db_command request id | ||
db_command.updateRequestId(); | ||
} | ||
} else { | ||
// Let's the out outgoing request Id | ||
var requestId = db_command.getRequestId().toString(); | ||
// Fetch either a reader or writer dependent on the specified read option | ||
var connection = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter(); | ||
// Override connection if needed | ||
connection = specifiedConnection != null ? specifiedConnection : connection; | ||
// Ensure we have a valid connection | ||
if(connection == null) return null; | ||
// Write the message out | ||
connection.write(db_command, function(err) { | ||
if(err != null) { | ||
// Emit the error | ||
self.emit("error", err); | ||
} | ||
}); | ||
} | ||
} | ||
try{ | ||
// Attempt forcing a reconnect if we have a replicaset server | ||
if(self.serverConfig instanceof ReplSetServers && !self.serverConfig.isConnected()) { | ||
// Initialize | ||
self.isInitializing = true; | ||
// Number of retries | ||
var retries = self.serverConfig.retries; | ||
// Attempts a reconnect | ||
var reconnectAttempt = function() { | ||
// Try reconnect | ||
self.serverConfig.connect(self, function(err, result) { | ||
// Initialize | ||
self.isInitializing = true; | ||
// Set retries | ||
retries = retries - 1; | ||
// If we fail retry the connec | ||
if(err != null && retries > 0) { | ||
// Wait an try again | ||
setTimeout(reconnectAttempt, self.serverConfig.reconnectWait); | ||
} else { | ||
// Ensure we catch any errors happening and report them (especially important for replicaset servers) | ||
try { | ||
if(err != null && callback instanceof Function) return callback(err, null); | ||
// for the other instances fire the message | ||
var writer = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter(); | ||
// If we got safe set | ||
if(errorCommand != null) { | ||
writer.send([db_command, errorCommand], rawConnection); | ||
var __retryCommandOnFailure = function(self, retryInMilliseconds, numberOfTimes, command, db_command, options, callback) { | ||
// Number of retries done | ||
var numberOfRetriesDone = numberOfTimes; | ||
// The interval function triggers retries | ||
var intervalId = setInterval(function() { | ||
// Attemp a reconnect | ||
self.serverConfig.connect(self, {firstCall: false}, function(err, result) { | ||
// Adjust the number of retries done | ||
numberOfRetriesDone = numberOfRetriesDone - 1; | ||
// If we have no error, we are done | ||
if(err != null && numberOfRetriesDone <= 0) { | ||
// No more retries, clear interval retries and fire an error | ||
clearInterval(intervalId); | ||
callback(err, null); | ||
} else if(err == null) { | ||
// Clear retries and fire message | ||
clearInterval(intervalId); | ||
// If we have auths we need to replay them | ||
if(Array.isArray(self.auths) && self.auths.length > 0) { | ||
// Get number of auths we need to execute | ||
var numberOfAuths = self.auths.length; | ||
// Apply all auths | ||
for(var i = 0; i < self.auths.length; i++) { | ||
self.authenticate(self.auths[i].username, self.auths[i].password, function(err, authenticated) { | ||
numberOfAuths = numberOfAuths - 1; | ||
// If we have no more authentications to replay | ||
if(numberOfAuths == 0) { | ||
if(err != null || !authenticated) { | ||
return callback(err, null); | ||
} else { | ||
writer.send(db_command, rawConnection); | ||
} | ||
} catch (err) { | ||
// Set server config to disconnected if it's a replicaset | ||
if(self.serverConfig instanceof ReplSetServers && err == "notConnected") { | ||
// Just clear up all connections as we need to perform a complete reconnect for the call | ||
self.serverConfig.disconnect(); | ||
command(self, db_command, options, callback); | ||
} | ||
// Signal an error | ||
if(!(err instanceof Error)) err = new Error(err); | ||
if(callback instanceof Function) { | ||
if(errorCommand != null) delete self.notReplied[errorCommand.getRequestId().toString()]; | ||
return callback(err, null); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
// Force a reconnect after self.serverConfig.reconnectWait seconds | ||
setTimeout(reconnectAttempt, self.serverConfig.reconnectWait); | ||
} else { | ||
// for the other instances fire the message | ||
var writer = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter(); | ||
// If we got safe set | ||
if(errorCommand != null) { | ||
writer.send([db_command, errorCommand], rawConnection); | ||
}) | ||
} | ||
} else { | ||
writer.send(db_command, rawConnection); | ||
} | ||
} | ||
} catch(err){ | ||
// Set server config to disconnected if it's a replicaset | ||
if(self.serverConfig instanceof ReplSetServers && err == "notConnected") { | ||
// Just clear up all connections as we need to perform a complete reconnect for the call | ||
self.serverConfig.disconnect(); | ||
command(self, db_command, options, callback); | ||
} | ||
} | ||
if(err == 'notConnected') { | ||
delete self.notReplied[listenToCommand.getRequestId().toString()]; | ||
return callback(new Error(err), null); | ||
} | ||
}); | ||
}, retryInMilliseconds); | ||
} | ||
// Signal an error | ||
if(!(err instanceof Error)) err = new Error(err); | ||
if(callback instanceof Function) { | ||
if(errorCommand != null) delete self.notReplied[errorCommand.getRequestId().toString()]; | ||
return callback(err, null); | ||
} | ||
// Return error object | ||
return err; | ||
} | ||
/** | ||
Execute db query command (not safe) | ||
**/ | ||
Db.prototype._executeQueryCommand = function(db_command, options, callback) { | ||
var self = this; | ||
// Unpack the parameters | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
callback = args.pop(); | ||
options = args.length ? args.shift() : {}; | ||
// If the pool is not connected, attemp to reconnect to send the message | ||
if(!this.serverConfig.isConnected() && this.serverConfig.autoReconnect) { | ||
__retryCommandOnFailure(this, this.retryMiliSeconds, this.numberOfRetries, __executeQueryCommand, db_command, options, callback); | ||
} else { | ||
__executeQueryCommand(self, db_command, options, callback) | ||
} | ||
}; | ||
var __executeInsertCommand = function(self, db_command, options, callback) { | ||
// Always checkout a writer for this kind of operations | ||
var connection = self.serverConfig.checkoutWriter(); | ||
var safe = options['safe'] != null ? options['safe'] : false; | ||
var raw = options['raw'] != null ? options['raw'] : self.raw; | ||
var specifiedConnection = options['connection'] != null ? options['connection'] : null; | ||
// Override connection if needed | ||
connection = specifiedConnection != null ? specifiedConnection : connection; | ||
// Ensure we have a valid connection | ||
if(callback instanceof Function) { | ||
// Ensure we have a valid connection | ||
if(connection == null) return callback(new Error("no open connections")); | ||
// We are expecting a check right after the actual operation | ||
if(safe != null && safe != false) { | ||
// db command is now an array of commands (original command + lastError) | ||
db_command = [db_command, DbCommand.createGetLastErrorCommand(safe, self)]; | ||
// Register the handler in the data structure | ||
self._registerHandler(db_command[1], raw, connection, callback); | ||
} | ||
} | ||
// If we have no callback and there is no connection | ||
if(connection == null) return null; | ||
// Write the message out | ||
connection.write(db_command, function(err) { | ||
// Return the callback if it's not a safe operation and the callback is defined | ||
if(callback instanceof Function && (safe == null || safe == false)) { | ||
callback(err, null); | ||
} else if(callback instanceof Function){ | ||
// Clean up listener and return error | ||
var callbackInstance = self._removeHandler(db_command[1]); | ||
// Only call if the reaper has not removed it | ||
if(callbackInstance != null) { | ||
callbackInstance.callback(err, null); | ||
} | ||
} else { | ||
self.emit("error", err); | ||
} | ||
}); | ||
} | ||
/** | ||
Execute an insert Command | ||
**/ | ||
Db.prototype._executeInsertCommand = function(db_command, options, callback) { | ||
var self = this; | ||
// Unpack the parameters | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
callback = args.pop(); | ||
options = args.length ? args.shift() : {}; | ||
// If the pool is not connected, attemp to reconnect to send the message | ||
if(!this.serverConfig.isConnected() && this.serverConfig.autoReconnect) { | ||
__retryCommandOnFailure(this, this.retryMiliSeconds, this.numberOfRetries, __executeInsertCommand, db_command, options, callback); | ||
} else { | ||
__executeInsertCommand(self, db_command, options, callback) | ||
} | ||
} | ||
// Update command is the same | ||
Db.prototype._executeUpdateCommand = Db.prototype._executeInsertCommand; | ||
Db.prototype._executeRemoveCommand = Db.prototype._executeInsertCommand; | ||
/** | ||
* Wrap a Mongo error document into an Error instance | ||
*/ | ||
Db.prototype.wrap = function(error) { | ||
var e = new Error(error.err); | ||
var e = new Error(error.err != null ? error.err : error.errmsg); | ||
e.name = 'MongoError'; | ||
@@ -897,4 +1032,3 @@ | ||
var hostPort = h.split(':', 2); | ||
return new Server(hostPort[0] || 'localhost', hostPort[1] || 27017, | ||
serverOptions); | ||
return new Server(hostPort[0] || 'localhost', hostPort[1] != null ? parseInt(hostPort[1]) : 27017, serverOptions); | ||
}); | ||
@@ -928,1 +1062,9 @@ | ||
} | ||
Db.prototype.removeAllEventListeners = function() { | ||
this.removeAllListeners("close"); | ||
this.removeAllListeners("error"); | ||
this.removeAllListeners("parseError"); | ||
this.removeAllListeners("poolReady"); | ||
this.removeAllListeners("message"); | ||
} |
@@ -930,4 +930,4 @@ /** | ||
// Build query | ||
var query = typeof fileIdObject == 'string' ? {'filename':fileIdObject} : {'_id':fileIdObject}; | ||
// Attempt to locate file | ||
var query = (typeof fileIdObject == 'string' || Object.prototype.toString.call(fileIdObject) == '[object RegExp]' ) | ||
? {'filename':fileIdObject} : {'_id':fileIdObject}; // Attempt to locate file | ||
collection.find(query, function(err, cursor) { | ||
@@ -934,0 +934,0 @@ cursor.nextObject(function(err, item) { |
@@ -21,5 +21,4 @@ | ||
, 'collection' | ||
, 'connections/server' | ||
, 'connections/repl_set_servers' | ||
, 'connection' | ||
, 'connection/server' | ||
, 'connection/repl_set_servers' | ||
, 'cursor' | ||
@@ -54,5 +53,4 @@ , 'db' | ||
, 'collection' | ||
, 'connections/server' | ||
, 'connections/repl_set_servers' | ||
, 'connection' | ||
, 'connection/server' | ||
, 'connection/repl_set_servers' | ||
, 'cursor' | ||
@@ -89,5 +87,4 @@ , 'db' | ||
, 'collection' | ||
, 'connections/server' | ||
, 'connections/repl_set_servers' | ||
, 'connection' | ||
, 'connection/server' | ||
, 'connection/repl_set_servers' | ||
, 'cursor' | ||
@@ -94,0 +91,0 @@ , 'db' |
@@ -8,41 +8,54 @@ var Long = require('../goog/math/long').Long, | ||
**/ | ||
var MongoReply = exports.MongoReply = function(db, binary_reply) { | ||
var MongoReply = exports.MongoReply = function() { | ||
this.documents = []; | ||
var index = 0; | ||
this.index = 0; | ||
}; | ||
MongoReply.prototype.parseHeader = function(binary_reply, bson) { | ||
// Unpack the standard header first | ||
var messageLength = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
index = index + 4; | ||
this.messageLength = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
this.index = this.index + 4; | ||
// Fetch the request id for this reply | ||
this.requestId = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
index = index + 4; | ||
this.requestId = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
this.index = this.index + 4; | ||
// Fetch the id of the request that triggered the response | ||
this.responseTo = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
this.responseTo = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
// Skip op-code field | ||
index = index + 4 + 4; | ||
this.index = this.index + 4 + 4; | ||
// Unpack the reply message | ||
this.responseFlag = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
index = index + 4; | ||
this.responseFlag = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
this.index = this.index + 4; | ||
// Unpack the cursor id (a 64 bit long integer) | ||
var low_bits = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
index = index + 4; | ||
var high_bits = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
index = index + 4; | ||
this.cursorId = new db.bson_deserializer.Long(low_bits, high_bits); | ||
var low_bits = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
this.index = this.index + 4; | ||
var high_bits = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
this.index = this.index + 4; | ||
this.cursorId = new bson.Long(low_bits, high_bits); | ||
// Unpack the starting from | ||
this.startingFrom = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
index = index + 4; | ||
this.startingFrom = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
this.index = this.index + 4; | ||
// Unpack the number of objects returned | ||
this.numberReturned = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
index = index + 4; | ||
this.numberReturned = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
this.index = this.index + 4; | ||
} | ||
MongoReply.prototype.parseBody = function(binary_reply, bson, raw) { | ||
raw = raw == null ? false : raw; | ||
// Let's unpack all the bson document, deserialize them and store them | ||
for(var object_index = 0; object_index < this.numberReturned; object_index++) { | ||
// Read the size of the bson object | ||
var bsonObjectSize = binary_reply[index] | binary_reply[index + 1] << 8 | binary_reply[index + 2] << 16 | binary_reply[index + 3] << 24; | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(db.bson_deserializer.BSON.deserialize(binary_reply.slice(index, index + bsonObjectSize))); | ||
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
// If we are storing the raw responses to pipe straight through | ||
if(raw) { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize)); | ||
} else { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(bson.BSON.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize))); | ||
} | ||
// Adjust binary index to point to next block of binary bson data | ||
index = index + bsonObjectSize; | ||
} | ||
}; | ||
this.index = this.index + bsonObjectSize; | ||
} | ||
} | ||
@@ -49,0 +62,0 @@ MongoReply.prototype.is_error = function(){ |
{ "name" : "mongodb" | ||
, "description" : "A node.js driver for MongoDB" | ||
, "keywords" : ["mongodb", "mongo", "driver", "db"] | ||
, "version" : "0.9.6-23" | ||
, "version" : "0.9.7-0" | ||
, "author" : "Christian Amor Kvalheim <christkv@gmail.com>" | ||
@@ -6,0 +6,0 @@ , "contributors" : [ "Aaron Heckmann", |
@@ -14,3 +14,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
var MONGODB = 'integration_tests'; | ||
var client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 1}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
@@ -49,3 +49,3 @@ // Define the tests, we want them to run as a nested test so we only clean up the | ||
shouldCorrectlyCallValidateCollection : function(test) { | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
fs_client.bson_deserializer = client.bson_deserializer; | ||
@@ -60,15 +60,17 @@ fs_client.bson_serializer = client.bson_serializer; | ||
fs_client.admin(function(err, adminDb) { | ||
adminDb.authenticate('admin', 'admin', function(err, replies) { | ||
adminDb.validateCollection('test', function(err, doc) { | ||
// Pre 1.9.1 servers | ||
if(doc.result != null) { | ||
test.ok(doc.result != null); | ||
test.ok(doc.result.match(/firstExtent/) != null); | ||
} else { | ||
test.ok(doc.firstExtent != null); | ||
} | ||
adminDb.addUser('admin', 'admin', function(err, result) { | ||
adminDb.authenticate('admin', 'admin', function(err, replies) { | ||
adminDb.validateCollection('test', function(err, doc) { | ||
// Pre 1.9.1 servers | ||
if(doc.result != null) { | ||
test.ok(doc.result != null); | ||
test.ok(doc.result.match(/firstExtent/) != null); | ||
} else { | ||
test.ok(doc.firstExtent != null); | ||
} | ||
fs_client.close(); | ||
test.done(); | ||
}); | ||
fs_client.close(); | ||
test.done(); | ||
}); | ||
}); | ||
}); | ||
@@ -83,3 +85,3 @@ }); | ||
shouldCorrectlySetDefaultProfilingLevel : function(test) { | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
fs_client.bson_deserializer = client.bson_deserializer; | ||
@@ -94,8 +96,10 @@ fs_client.bson_serializer = client.bson_serializer; | ||
fs_client.admin(function(err, adminDb) { | ||
adminDb.authenticate('admin', 'admin', function(err, replies) { | ||
adminDb.profilingLevel(function(err, level) { | ||
test.equal("off", level); | ||
adminDb.addUser('admin', 'admin', function(err, result) { | ||
adminDb.authenticate('admin', 'admin', function(err, replies) { | ||
adminDb.profilingLevel(function(err, level) { | ||
test.equal("off", level); | ||
fs_client.close(); | ||
test.done(); | ||
fs_client.close(); | ||
test.done(); | ||
}); | ||
}); | ||
@@ -111,3 +115,3 @@ }); | ||
shouldCorrectlyChangeProfilingLevel : function(test) { | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
fs_client.bson_deserializer = client.bson_deserializer; | ||
@@ -157,3 +161,3 @@ fs_client.bson_serializer = client.bson_serializer; | ||
shouldCorrectlySetAndExtractProfilingInfo : function(test) { | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var fs_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: false, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
fs_client.bson_deserializer = client.bson_deserializer; | ||
@@ -193,3 +197,3 @@ fs_client.bson_serializer = client.bson_serializer; | ||
}, | ||
// run this last | ||
@@ -206,140 +210,2 @@ noGlobalsLeaked : function(test) { | ||
// Assign out tests | ||
module.exports = tests; | ||
// test_kill_cursors : function() { | ||
// var test_kill_cursors_client = new Db('integration_tests4_', new Server("127.0.0.1", 27017, {auto_reconnect: true}), {}); | ||
// test_kill_cursors_client.bson_deserializer = client.bson_deserializer; | ||
// test_kill_cursors_client.bson_serializer = client.bson_serializer; | ||
// test_kill_cursors_client.pkFactory = client.pkFactory; | ||
// | ||
// test_kill_cursors_client.open(function(err, test_kill_cursors_client) { | ||
// var number_of_tests_done = 0; | ||
// | ||
// test_kill_cursors_client.dropCollection('test_kill_cursors', function(err, collection) { | ||
// test_kill_cursors_client.createCollection('test_kill_cursors', function(err, collection) { | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// var clientCursors = cursorInfo.clientCursors_size; | ||
// var byLocation = cursorInfo.byLocation_size; | ||
// | ||
// for(var i = 0; i < 1000; i++) { | ||
// collection.save({'i': i}, function(err, doc) {}); | ||
// } | ||
// | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// test.equal(clientCursors, cursorInfo.clientCursors_size); | ||
// test.equal(byLocation, cursorInfo.byLocation_size); | ||
// | ||
// for(var i = 0; i < 10; i++) { | ||
// collection.findOne(function(err, item) {}); | ||
// } | ||
// | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// test.equal(clientCursors, cursorInfo.clientCursors_size); | ||
// test.equal(byLocation, cursorInfo.byLocation_size); | ||
// | ||
// for(var i = 0; i < 10; i++) { | ||
// collection.find(function(err, cursor) { | ||
// cursor.nextObject(function(err, item) { | ||
// cursor.close(function(err, cursor) {}); | ||
// | ||
// if(i == 10) { | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// test.equal(clientCursors, cursorInfo.clientCursors_size); | ||
// test.equal(byLocation, cursorInfo.byLocation_size); | ||
// | ||
// collection.find(function(err, cursor) { | ||
// cursor.nextObject(function(err, item) { | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// test.equal(clientCursors, cursorInfo.clientCursors_size); | ||
// test.equal(byLocation, cursorInfo.byLocation_size); | ||
// | ||
// cursor.close(function(err, cursor) { | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// test.equal(clientCursors, cursorInfo.clientCursors_size); | ||
// test.equal(byLocation, cursorInfo.byLocation_size); | ||
// | ||
// collection.find({}, {'limit':10}, function(err, cursor) { | ||
// cursor.nextObject(function(err, item) { | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// test_kill_cursors_client.cursorInfo(function(err, cursorInfo) { | ||
// sys.puts("===================================== err: " + err) | ||
// sys.puts("===================================== cursorInfo: " + sys.inspect(cursorInfo)) | ||
// | ||
// | ||
// test.equal(clientCursors, cursorInfo.clientCursors_size); | ||
// test.equal(byLocation, cursorInfo.byLocation_size); | ||
// number_of_tests_done = 1; | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// } | ||
// }); | ||
// }); | ||
// } | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// | ||
// var intervalId = setInterval(function() { | ||
// if(number_of_tests_done == 1) { | ||
// clearInterval(intervalId); | ||
// finished_test({test_kill_cursors:'ok'}); | ||
// test_kill_cursors_client.close(); | ||
// } | ||
// }, 100); | ||
// }); | ||
// }, | ||
// test_force_binary_error : function() { | ||
// client.createCollection('test_force_binary_error', function(err, collection) { | ||
// // Try to fetch an object using a totally invalid and wrong hex string... what we're interested in here | ||
// // is the error handling of the findOne Method | ||
// var result= ""; | ||
// var hexString = "5e9bd59248305adf18ebc15703a1"; | ||
// for(var index=0 ; index < hexString.length; index+=2) { | ||
// var string= hexString.substr(index, 2); | ||
// var number= parseInt(string, 16); | ||
// result+= BinaryParser.fromByte(number); | ||
// } | ||
// | ||
// // Generate a illegal ID | ||
// var id = client.bson_serializer.ObjectID.createFromHexString('5e9bd59248305adf18ebc157'); | ||
// id.id = result; | ||
// | ||
// // Execute with error | ||
// collection.findOne({"_id": id}, function(err, result) { | ||
// // test.equal(undefined, result) | ||
// test.ok(err != null) | ||
// finished_test({test_force_binary_error:'ok'}); | ||
// }); | ||
// }); | ||
// }, | ||
// test_long_term_insert : function() { | ||
// var numberOfTimes = 21000; | ||
// | ||
// client.createCollection('test_safe_insert', function(err, collection) { | ||
// var timer = setInterval(function() { | ||
// collection.insert({'test': 1}, {safe:true}, function(err, result) { | ||
// numberOfTimes = numberOfTimes - 1; | ||
// | ||
// if(numberOfTimes <= 0) { | ||
// clearInterval(timer); | ||
// collection.count(function(err, count) { | ||
// test.equal(21000, count); | ||
// finished_test({test_long_term_insert:'ok'}) | ||
// }); | ||
// } | ||
// }); | ||
// }, 1); | ||
// }); | ||
// }, | ||
module.exports = tests; |
@@ -83,12 +83,10 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
test.ok(err == null); | ||
test.ok(replies); | ||
// Kill a connection to force a reconnect | ||
p_client.serverConfig.connection.pool[0].connection.end(); | ||
p_client.serverConfig.close(); | ||
p_client.createCollection('shouldCorrectlyReAuthorizeReconnectedConnections', function(err, collection) { | ||
collection.insert({a:1}, {safe:true}, function(err, r) { | ||
collection.insert({a:2}, {safe:true}, function(err, r) { | ||
collection.insert({a:3}, {safe:true}, function(err, r) { | ||
collection.insert({a:3}, {safe:true}, function(err, r) { | ||
collection.count(function(err, count) { | ||
@@ -141,3 +139,3 @@ test.equal(3, count); | ||
}, | ||
// run this last | ||
@@ -144,0 +142,0 @@ noGlobalsLeaked : function(test) { |
var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure(); | ||
var testCase = require('../../deps/nodeunit').testCase, | ||
debug = require('util').debug | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
nodeunit = require('../../deps/nodeunit'), | ||
gleak = require('../../tools/gleak'), | ||
Db = mongodb.Db, | ||
@@ -93,3 +94,2 @@ Cursor = mongodb.Cursor, | ||
test.ok(result2 != null); | ||
admin.logout(this.parallel()); | ||
@@ -146,10 +146,11 @@ db1.admin().logout(this.parallel()); | ||
db1.collection('stuff', function(err, collection) { | ||
collection.find().toArray(function(err, items) { | ||
test.ok(err == null); | ||
test.equal(1, items.length); | ||
db1.collection('stuff', function(err, collection) { | ||
collection.insert({a:2}, {safe:true}, self.parallel()); | ||
}); | ||
db2.collection('stuff', function(err, collection) { | ||
@@ -178,4 +179,3 @@ collection.insert({a:2}, {safe:true}, self.parallel()); | ||
function logoutDb2(err, result) { | ||
test.ok(err != null); | ||
test.ok(err != null); | ||
db2.logout(this); | ||
@@ -190,2 +190,6 @@ }, | ||
test.done(); | ||
// Close all connections | ||
db1.close(); | ||
db2.close(); | ||
admin.close(); | ||
}); | ||
@@ -196,2 +200,8 @@ }); | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) | ||
@@ -198,0 +208,0 @@ |
var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure(); | ||
var testCase = require('../../deps/nodeunit').testCase, | ||
debug = require('util').debug | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
nodeunit = require('../../deps/nodeunit'), | ||
gleak = require('../../tools/gleak'), | ||
Db = mongodb.Db, | ||
@@ -18,2 +19,3 @@ Cursor = mongodb.Cursor, | ||
var serverManager = null; | ||
var RS = RS == null ? null : RS; | ||
@@ -38,11 +40,10 @@ // Define the tests, we want them to run as a nested test so we only clean up the | ||
RS.killAll(function() { | ||
callback(); | ||
}) | ||
callback(); | ||
}); | ||
}, | ||
shouldCorrectlyAuthenticate : function(test) { | ||
shouldCorrectlyAuthenticateWithMultipleLoginsAndLogouts : function(test) { | ||
var replSet = new ReplSetServers( [ | ||
new Server( RS.host, RS.ports[1], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[2], { auto_reconnect: true } ) | ||
], | ||
@@ -83,3 +84,3 @@ {rs_name:RS.name} | ||
db.collection("stuff", function(err, collection) { | ||
collection.insert({a:2}, {safe: {w: 3}}, self); | ||
collection.insert({a:3}, {safe: true}, self); | ||
}); | ||
@@ -99,3 +100,3 @@ }, | ||
test.ok(err == null); | ||
test.equal(2, item.a); | ||
test.equal(3, item.a); | ||
@@ -125,7 +126,6 @@ db.admin().logout(this); | ||
}); | ||
}, | ||
}, | ||
function shouldCorrectlyAuthenticateAgainstSecondary(err, result) { | ||
test.ok(err != null) | ||
test.ok(err != null) | ||
slaveDb.admin().authenticate('me', 'secret', this); | ||
@@ -146,5 +146,7 @@ }, | ||
test.ok(err == null); | ||
test.equal(2, item.a); | ||
test.equal(3, item.a); | ||
test.done(); | ||
p_db.close(); | ||
slaveDb.close(); | ||
} | ||
@@ -159,3 +161,2 @@ ) | ||
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[2], { auto_reconnect: true } ) | ||
], | ||
@@ -178,5 +179,5 @@ {rs_name:RS.name, read_secondary:true} | ||
test.ok(result != null); | ||
db.collection("stuff", function(err, collection) { | ||
collection.insert({a:2}, {safe: {w: 3}}, self); | ||
collection.insert({a:2}, {safe: {w: 2, wtimeout: 10000}}, self); | ||
}); | ||
@@ -195,5 +196,5 @@ }, | ||
test.ok(result); | ||
db.collection("stuff", function(err, collection) { | ||
collection.insert({a:2}, {safe: {w: 3}}, self); | ||
collection.insert({a:2}, {safe: {w: 2, wtimeout: 10000}}, self); | ||
}); | ||
@@ -215,6 +216,13 @@ }, | ||
test.done(); | ||
p_db.close(); | ||
} | ||
) | ||
}); | ||
} | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) | ||
@@ -221,0 +229,0 @@ |
@@ -224,5 +224,5 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
error_client.bson_serializer = client.bson_serializer; | ||
error_client.pkFactory = client.pkFactory; | ||
error_client.pkFactory = client.pkFactory; | ||
test.equal(true, error_client.strict); | ||
error_client.open(function(err, error_client) { | ||
@@ -233,3 +233,3 @@ error_client.collection('does-not-exist', function(err, collection) { | ||
}); | ||
error_client.createCollection('test_strict_access_collection', function(err, collection) { | ||
@@ -288,21 +288,21 @@ error_client.collection('test_strict_access_collection', function(err, collection) { | ||
test.equal("key $hello must not start with '$'", err.message); | ||
collection.insert({'he$llo':'world'}, {safe:true}, function(err, docs) { | ||
test.ok(docs[0].constructor == Object); | ||
collection.insert({'hello':{'hell$o':'world'}}, {safe:true}, function(err, docs) { | ||
test.ok(err == null); | ||
collection.insert({'.hello':'world'}, {safe:true}, function(err, doc) { | ||
test.ok(err instanceof Error); | ||
test.equal("key .hello must not contain '.'", err.message); | ||
collection.insert({'hello':{'.hello':'world'}}, {safe:true}, function(err, doc) { | ||
test.ok(err instanceof Error); | ||
test.equal("key .hello must not contain '.'", err.message); | ||
collection.insert({'hello.':'world'}, {safe:true}, function(err, doc) { | ||
test.ok(err instanceof Error); | ||
test.equal("key hello. must not contain '.'", err.message); | ||
collection.insert({'hello':{'hello.':'world'}}, {safe:true}, function(err, doc) { | ||
@@ -469,3 +469,3 @@ test.ok(err instanceof Error); | ||
test.equal(1, doc.a); | ||
id = new client.bson_serializer.ObjectID(null) | ||
@@ -485,3 +485,3 @@ doc = {_id:id, a:2}; | ||
test.equal(2, doc2.a); | ||
collection.update({"_id":id}, doc2, {safe:true, upsert:true}, function(err, result) { | ||
@@ -635,3 +635,3 @@ test.equal(null, err); | ||
}, | ||
// run this last | ||
@@ -638,0 +638,0 @@ noGlobalsLeaked : function(test) { |
@@ -70,19 +70,22 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
testCloseNoCallback : function(test) { | ||
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}), | ||
{native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
db.open(connectionTester(test, 'testCloseNoCallback', function() { | ||
var dbCloseCount = 0, connectionCloseCount = 0, poolCloseCount = 0; | ||
db.on('close', function() { ++dbCloseCount; }); | ||
var connection = db.serverConfig.connection; | ||
connection.on('close', function() { ++connectionCloseCount; }); | ||
connection.pool.forEach(function(poolMember) { | ||
poolMember.connection.on('close', function() { ++poolCloseCount; }); | ||
}); | ||
db.close(); | ||
setTimeout(function() { | ||
test.equal(dbCloseCount, 1); | ||
test.equal(connectionCloseCount, 1); | ||
test.equal(poolCloseCount, 4); | ||
test.done(); | ||
}, 250); | ||
// Ensure no close events are fired as we are closing the connection specifically | ||
db.on('close', function() { dbCloseCount++; }); | ||
var connectionPool = db.serverConfig.connectionPool; | ||
var connections = connectionPool.getAllConnections(); | ||
var keys = Object.keys(connections); | ||
// Ensure no close events are fired as we are closing the connection specifically | ||
for(var i = 0; i < keys.length; i++) { | ||
connections[keys[i]].on("close", function() { test.ok(false); }); | ||
} | ||
// Force the connection close | ||
db.serverConfig.connectionPool.stop(); | ||
// Test done | ||
test.equal(1, dbCloseCount); | ||
test.done(); | ||
})); | ||
@@ -92,21 +95,21 @@ }, | ||
testCloseWithCallback : function(test) { | ||
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}), | ||
{native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var db = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 4}),{native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
db.open(connectionTester(test, 'testCloseWithCallback', function() { | ||
var dbCloseCount = 0, connectionCloseCount = 0, poolCloseCount = 0; | ||
db.on('close', function() { ++dbCloseCount; }); | ||
var connection = db.serverConfig.connection; | ||
connection.on('close', function() { ++connectionCloseCount; }); | ||
connection.pool.forEach(function(poolMember) { | ||
poolMember.connection.on('close', function() { ++poolCloseCount; }); | ||
}); | ||
// Ensure no close events are fired as we are closing the connection specifically | ||
db.on('close', function() { dbCloseCount++; }); | ||
var connectionPool = db.serverConfig.connectionPool; | ||
var connections = connectionPool.getAllConnections(); | ||
var keys = Object.keys(connections); | ||
// Ensure no close events are fired as we are closing the connection specifically | ||
for(var i = 0; i < keys.length; i++) { | ||
connections[keys[i]].on("close", function() { test.ok(false); }); | ||
} | ||
db.close(function() { | ||
// Let all events fire. | ||
process.nextTick(function() { | ||
test.equal(dbCloseCount, 1); | ||
test.equal(connectionCloseCount, 1); | ||
test.equal(poolCloseCount, 4); | ||
test.done(); | ||
}); | ||
// Test done | ||
test.equal(0, dbCloseCount); | ||
test.done(); | ||
}); | ||
@@ -113,0 +116,0 @@ })); |
@@ -21,3 +21,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure(); | ||
'Should Correctly create a pool instance with the expected values' : function(test) { | ||
var connectionPool = new ConnectionPool('localhost', 2000, 1, {timeout:100, noDelay:true}); | ||
var connectionPool = new ConnectionPool('localhost', 2000, 1, null, {timeout:100, noDelay:true}); | ||
test.equal(100, connectionPool.socketOptions.timeout); | ||
@@ -31,12 +31,12 @@ test.equal(true, connectionPool.socketOptions.noDelay); | ||
'Should correctly fail due to no server' : function(test) { | ||
var connectionPool = new ConnectionPool('localhost', 2000, 1, {timeout:100, noDelay:true}); | ||
// Add event handler that will fire once the pool is ready | ||
connectionPool.on("poolReady", function(err, result) { | ||
var connectionPool = new ConnectionPool('localhost', 2000, 4, null, {timeout:100, noDelay:true}); | ||
// // Add event handler that will fire once the pool is ready | ||
connectionPool.on("poolReady", function(err, result) { | ||
}) | ||
// Add event handler that will fire when it fails | ||
connectionPool.on("error", function(err, connection) { | ||
test.equal(0, Object.keys(connectionPool.waitingToOpen).length); | ||
test.equal(1, Object.keys(connectionPool.connectionsWithErrors).length); | ||
test.equal(4, Object.keys(connectionPool.connectionsWithErrors).length); | ||
test.equal(0, Object.keys(connectionPool.openConnections).length); | ||
@@ -49,6 +49,6 @@ test.done(); | ||
}, | ||
'Should Correctly create a pool of connections and receive an ok when all connections are active' : function(test) { | ||
var connectionPool = new ConnectionPool('localhost', 27017, 4, {timeout:100, noDelay:true}); | ||
// Add event handler that will fire once the pool is ready | ||
@@ -63,2 +63,22 @@ connectionPool.on("poolReady", function() { | ||
'Should Correctly connect and then force a restart creating new connections' : function(test) { | ||
var connectionPool = new ConnectionPool('localhost', 27017, 4, {timeout:100, noDelay:true}); | ||
var done = false; | ||
// Add event handler that will fire once the pool is ready | ||
connectionPool.on("poolReady", function() { | ||
// Restart | ||
if(done) { | ||
test.done(); | ||
} else { | ||
// Trigger stop | ||
connectionPool.restart(); | ||
done = true; | ||
} | ||
}) | ||
// Start the pool | ||
connectionPool.start(); | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
@@ -65,0 +85,0 @@ var leaks = gleak.detectNew(); |
@@ -979,3 +979,35 @@ var testCase = require('../deps/nodeunit').testCase, | ||
}, | ||
'Should correctly rewind and restart cursor' : function(test) { | ||
var docs = []; | ||
for(var i = 0; i < 100; i++) { | ||
var d = new Date().getTime() + i*1000; | ||
docs[i] = {'a':i, createdAt:new Date(d)}; | ||
} | ||
// Create collection | ||
client.createCollection('Should_correctly_rewind_and_restart_cursor', function(err, collection) { | ||
test.equal(null, err); | ||
// insert all docs | ||
collection.insert(docs, {safe:true}, function(err, result) { | ||
test.equal(null, err); | ||
var cursor = collection.find({}); | ||
cursor.nextObject(function(err, item) { | ||
test.equal(0, item.a) | ||
// Rewind the cursor | ||
cursor.rewind(); | ||
// Grab the first object | ||
cursor.nextObject(function(err, item) { | ||
test.equal(0, item.a) | ||
test.done(); | ||
}) | ||
}) | ||
}) | ||
}); | ||
}, | ||
// run this last | ||
@@ -982,0 +1014,0 @@ noGlobalsLeaked: function(test) { |
@@ -90,3 +90,2 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
} | ||
}) | ||
@@ -93,0 +92,0 @@ |
@@ -98,13 +98,10 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
shouldCorrectlyPerformAutomaticConnect : function(test) { | ||
var automatic_connect_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true}), {native_parser: (process.env['TEST_NATIVE'] != null)}); | ||
var automatic_connect_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true}), {native_parser: (process.env['TEST_NATIVE'] != null), retryMiliSeconds:50}); | ||
automatic_connect_client.bson_deserializer = client.bson_deserializer; | ||
automatic_connect_client.bson_serializer = client.bson_serializer; | ||
automatic_connect_client.pkFactory = client.pkFactory; | ||
automatic_connect_client.open(function(err, automatic_connect_client) { | ||
// Listener for closing event | ||
var closeListener = function(has_error) { | ||
// Remove the listener for the close to avoid loop | ||
automatic_connect_client.removeListener("close", closeListener); | ||
// Let's insert a document | ||
@@ -133,19 +130,2 @@ automatic_connect_client.collection('test_object_id_generation.data2', function(err, collection) { | ||
// Test that error conditions are handled correctly | ||
shouldCorrectlyHandleConnectionErrors : function(test) { | ||
// Test error handling for single server connection | ||
var serverConfig = new Server("127.0.0.1", 21017, {auto_reconnect: true}); | ||
var error_client = new Db(MONGODB, serverConfig, {native_parser: (process.env['TEST_NATIVE'] != null) ? true : false}); | ||
error_client.on("error", function(err) {}); | ||
error_client.on("close", function(connection) { | ||
test.ok(typeof connection == typeof serverConfig); | ||
test.equal("127.0.0.1", connection.host); | ||
test.equal(21017, connection.port); | ||
test.equal(true, connection.autoReconnect); | ||
test.done(); | ||
}); | ||
error_client.open(function(err, error_client) {}); | ||
}, | ||
shouldCorrectlyExecuteEvalFunctions : function(test) { | ||
@@ -152,0 +132,0 @@ client.eval('function (x) {return x;}', [3], function(err, result) { |
@@ -88,3 +88,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
error_client.error(function(err, documents) { | ||
error_client.error(function(err, documents) { | ||
test.equal(true, documents[0].ok); | ||
@@ -98,3 +98,3 @@ test.equal(0, documents[0].n); | ||
test.ok(err instanceof Error); | ||
test.ok('notConnected' === err.message); | ||
test.equal('no open connections', err.message); | ||
test.done(); | ||
@@ -240,15 +240,15 @@ }); | ||
test.ok(err instanceof Error); | ||
test.ok('notConnected' === err.message); | ||
test.equal('no open connections', err.message); | ||
collection.update({ inserted: true }, { inserted: true, x: 1 }, { safe: true }, function (err) { | ||
test.ok(err instanceof Error); | ||
test.ok('notConnected' === err.message); | ||
test.equal('no open connections', err.message); | ||
collection.remove({ inserted: true }, { safe: true }, function (err) { | ||
test.ok(err instanceof Error); | ||
test.ok('notConnected' === err.message); | ||
test.equal('no open connections', err.message); | ||
collection.findOne({ works: true }, function (err) { | ||
test.ok(err instanceof Error); | ||
test.ok('notConnected' === err.message); | ||
test.equal('no open connections', err.message); | ||
test.done(); | ||
@@ -258,4 +258,3 @@ }); | ||
}); | ||
}); | ||
}); | ||
}); | ||
@@ -273,5 +272,5 @@ }); | ||
test.ok(err == null); | ||
var query = {a:{$within:{$box:[[1,-10],[80,120]]}}}; | ||
// We don't have a geospatial index at this point | ||
@@ -288,3 +287,3 @@ collection.findOne(query, function(err, docs) { | ||
var invalidQuery = {a:{$within:{$box:[[-10,-180],[10,180]]}}}; | ||
client.admin().serverInfo(function(err, result){ | ||
@@ -298,5 +297,5 @@ collection.findOne(invalidQuery, function(err, doc) { | ||
} | ||
test.done(); | ||
}); | ||
}); | ||
}); | ||
@@ -310,3 +309,3 @@ }); | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
@@ -313,0 +312,0 @@ var leaks = gleak.detectNew(); |
@@ -50,4 +50,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
client.collection('shouldCorrectlyHandleThrownError', function(err, collection) { | ||
test.done(); | ||
//debug(someUndefinedVariable); | ||
debug(someUndefinedVariable); | ||
}); | ||
@@ -71,4 +70,3 @@ } catch (err) { | ||
collection.rename("shouldCorrectlyHandleThrownErrorInRename2", function(err, result) { | ||
test.done(); | ||
//debug(someUndefinedVariable); | ||
debug(someUndefinedVariable); | ||
}) | ||
@@ -75,0 +73,0 @@ }); |
@@ -47,52 +47,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
}, | ||
'Error thrown in handler test': function(test){ | ||
// Should not be called | ||
var exceptionHandler = function(exception) { | ||
numberOfFailsCounter++; | ||
//console.log('Exception caught: ' + exception.message); | ||
//console.log(inspect(exception.stack.toString())) | ||
}; | ||
// Number of times we should fail | ||
var numberOfFailsCounter = 0; | ||
process.on('uncaughtException', exceptionHandler) | ||
client.createCollection('error_test', function(err, collection) { | ||
var testObject = {}; | ||
for(var i = 0; i < 5000; i++){ | ||
testObject['setting_' + i] = i; | ||
} | ||
testObject.name = 'test1'; | ||
var c = 0; | ||
var counter = 0; | ||
collection.insert([testObject, {name:'test2'}], {safe:true}, function(err, doc) { | ||
var findOne = function(){ | ||
collection.findOne({name: 'test1'}, function(err, doc) { | ||
counter++; | ||
if(counter > POOL_SIZE){ | ||
process.removeListener('uncaughtException', exceptionHandler); | ||
collection.findOne({name: 'test1'}, function(err, doc) { | ||
test.equal(5002, Object.keys(doc).length) | ||
test.equal(4, numberOfFailsCounter); | ||
test.done(); | ||
}); | ||
} else { | ||
process.nextTick(findOne); | ||
throw new Error('Some error'); | ||
} | ||
}); | ||
}; | ||
findOne(); | ||
}); | ||
}); | ||
}, | ||
// Test a simple find | ||
@@ -150,6 +101,6 @@ shouldCorrectlyPerformSimpleFind : function(test) { | ||
test.equal(2, documents.length); | ||
collection.count(function(err, count) { | ||
test.equal(2, count); | ||
// Fetch values by selection | ||
@@ -911,18 +862,87 @@ collection.find({'a': doc1.a}).toArray(function(err, documents) { | ||
// Should correctly execute findAndModify that is breaking in prod | ||
// shouldCorrectlyExecuteFindAndModify : function(test) { | ||
// client.createCollection('shouldCorrectlyExecuteFindAndModify', function(err, collection) { | ||
// var self = {_id : new client.bson_serializer.ObjectID()} | ||
// var _uuid = 'sddffdss' | ||
// | ||
// collection.findAndModify( | ||
// {_id: self._id, 'plays.uuid': _uuid}, | ||
// [], | ||
// {$set : {'plays.$.active': true}}, | ||
// {new: true, fields: {plays: 0, results: 0}, safe: true}, | ||
// function(err, contest) { | ||
// test.done(); | ||
// }) | ||
// }); | ||
// }, | ||
shouldCorrectlyExecuteFindAndModify : function(test) { | ||
client.createCollection('shouldCorrectlyExecuteFindAndModify', function(err, collection) { | ||
var self = {_id : new client.bson_serializer.ObjectID()} | ||
var _uuid = 'sddffdss' | ||
collection.findAndModify( | ||
{_id: self._id, 'plays.uuid': _uuid}, | ||
[], | ||
{$set : {'plays.$.active': true}}, | ||
{new: true, fields: {plays: 0, results: 0}, safe: true}, | ||
function(err, contest) { | ||
test.done(); | ||
}) | ||
}); | ||
}, | ||
'Should correctly return record with 64-bit id' : function(test) { | ||
client.createCollection('should_correctly_return_record_with_64bit_id', function(err, collection) { | ||
var _lowerId = new client.bson_serializer.ObjectID(); | ||
var _higherId = new client.bson_serializer.ObjectID(); | ||
var lowerId = new client.bson_serializer.Long.fromString('133118461172916224', 10); | ||
var higherId = new client.bson_serializer.Long.fromString('133118461172916225', 10); | ||
var lowerDoc = {_id:_lowerId, id: lowerId}; | ||
var higherDoc = {_id:_higherId, id: higherId}; | ||
collection.insert([lowerDoc, higherDoc], {safe:true}, function(err, result) { | ||
test.ok(err == null); | ||
// Select record with id of 133118461172916225 using $gt directive | ||
collection.find({id: {$gt: lowerId}}, {}, function(err, cur) { | ||
test.ok(err == null); | ||
cur.toArray(function(err, arr) { | ||
test.ok(err == null); | ||
test.equal(arr.length, 1, 'Selecting record via $gt directive on 64-bit integer should return a record with higher Id') | ||
test.equal(arr[0].id.toString(), '133118461172916225', 'Returned Id should be equal to 133118461172916225') | ||
test.done() | ||
}); | ||
}); | ||
}); | ||
}); | ||
}, | ||
'Should Correctly find a Document using findOne excluding _id field' : function(test) { | ||
client.createCollection('Should_Correctly_find_a_Document_using_findOne_excluding__id_field', function(err, collection) { | ||
var doc = {_id : new client.bson_serializer.ObjectID(), a:1, c:2} | ||
// insert doc | ||
collection.insert(doc, {safe:true}, function(err, result) { | ||
// Get one document, excluding the _id field | ||
collection.findOne({a:1}, {fields:{'_id': 0}}, function(err, item) { | ||
test.equal(null, item._id); | ||
test.equal(1, item.a); | ||
test.equal(2, item.c); | ||
collection.find({a:1}, {fields:{'_id':0}}).toArray(function(err, items) { | ||
var item = items[0] | ||
test.equal(null, item._id); | ||
test.equal(1, item.a); | ||
test.equal(2, item.c); | ||
test.done(); | ||
}) | ||
}) | ||
}); | ||
}); | ||
}, | ||
'Should correctly execute find and findOne queries in the same way' : function(test) { | ||
client.createCollection('Should_correctly_execute_find_and_findOne_queries_in_the_same_way', function(err, collection) { | ||
var doc = {_id : new client.bson_serializer.ObjectID(), a:1, c:2, comments:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}; | ||
// insert doc | ||
collection.insert(doc, {safe:true}, function(err, result) { | ||
collection.find({_id: doc._id}, {comments: {$slice: -5}}).toArray(function(err, docs) { | ||
test.equal(5, docs[0].comments.length) | ||
collection.findOne({_id: doc._id}, {comments: {$slice: -5}}, function(err, item) { | ||
test.equal(5, item.comments.length) | ||
test.done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
@@ -929,0 +949,0 @@ var leaks = gleak.detectNew(); |
@@ -230,3 +230,2 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure(); | ||
for(var i = 0; i < data.length; i++) { | ||
// debug(" i = " + i) | ||
test.equal(data2[i], data[i]) | ||
@@ -233,0 +232,0 @@ test.equal(streamData[i], data[i]) |
@@ -657,2 +657,21 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure(); | ||
shouldCheckExistsByUsingRegexp : function(test) { | ||
var gridStore = new GridStore(client, 'shouldCheckExistsByUsingRegexp.txt', 'w'); | ||
gridStore.open(function(err, gridStore) { | ||
gridStore.write('bar', function(err, gridStore) { | ||
gridStore.close(function(err, result) { | ||
GridStore.exist(client, /shouldCheck/, function(err, result) { | ||
test.equal(null, err); | ||
test.equal(true, result); | ||
client.close(); | ||
test.done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
@@ -659,0 +678,0 @@ var leaks = gleak.detectNew(); |
@@ -546,3 +546,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
// Insert the update | ||
collection.insert({i:1, z:func }, {safe:true}, function(err, result) { | ||
collection.insert({i:1, z:func }, {safe:true, serializeFunctions:true}, function(err, result) { | ||
collection.findOne({_id:result[0]._id}, function(err, object) { | ||
@@ -740,2 +740,81 @@ test.equal(func.toString(), object.z.code); | ||
'Should Correctly allow for control of serialization of functions on command level' : function(test) { | ||
var doc = { | ||
str : "String", | ||
func : function() {} | ||
} | ||
client.createCollection("Should_Correctly_allow_for_control_of_serialization_of_functions_on_command_level", function(err, collection) { | ||
test.ok(err == null); | ||
collection.insert(doc, {safe:true}, function(err, result) { | ||
collection.update({str:"String"}, {$set:{c:1, d:function(){}}}, {safe:true, serializeFunctions:false}, function(err, result) { | ||
test.equal(1, result); | ||
collection.findOne({str:"String"}, function(err, item) { | ||
test.equal(null, item.d); | ||
// Execute a safe insert with replication to two servers | ||
collection.findAndModify({str:"String"}, [['a', 1]], {'$set':{'f':function() {}}}, {new:true, safe: true, serializeFunctions:true}, function(err, result) { | ||
test.ok(result.f instanceof client.bson_deserializer.Code) | ||
test.done(); | ||
}) | ||
}) | ||
}) | ||
}); | ||
}); | ||
}, | ||
'Should Correctly allow for control of serialization of functions on collection level' : function(test) { | ||
var doc = { | ||
str : "String", | ||
func : function() {} | ||
} | ||
client.createCollection("Should_Correctly_allow_for_control_of_serialization_of_functions_on_collection_level", {serializeFunctions:true}, function(err, collection) { | ||
test.ok(err == null); | ||
collection.insert(doc, {safe:true}, function(err, result) { | ||
test.equal(null, err); | ||
collection.findOne({str : "String"}, function(err, item) { | ||
test.ok(item.func instanceof client.bson_deserializer.Code); | ||
test.done(); | ||
}); | ||
}); | ||
}); | ||
}, | ||
'Should Correctly allow for using a Date object as _id' : function(test) { | ||
var doc = { | ||
_id : new Date(), | ||
str : 'hello' | ||
} | ||
client.createCollection("Should_Correctly_allow_for_using_a_Date_object_as__id", {serializeFunctions:true}, function(err, collection) { | ||
test.ok(err == null); | ||
collection.insert(doc, {safe:true}, function(err, result) { | ||
test.equal(null, err); | ||
collection.findOne({str : "hello"}, function(err, item) { | ||
test.ok(item._id instanceof Date); | ||
test.done(); | ||
}); | ||
}); | ||
}); | ||
}, | ||
'Should Correctly fail to update returning 0 results' : function(test) { | ||
client.createCollection("Should_Correctly_fail_to_update_returning_0_results", {serializeFunctions:true}, function(err, collection) { | ||
test.ok(err == null); | ||
collection.update({a:1}, {$set: {a:1}}, {safe:true}, function(err, numberOfUpdated) { | ||
test.equal(0, numberOfUpdated); | ||
test.done(); | ||
}); | ||
}); | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
@@ -742,0 +821,0 @@ var leaks = gleak.detectNew(); |
@@ -46,18 +46,18 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
// shouldCorrectlyInsertSimpleRegExpDocument : function(test) { | ||
// var regexp = /foobar/i; | ||
// | ||
// client.createCollection('test_regex', function(err, collection) { | ||
// collection.insert({'b':regexp}, {safe:true}, function(err, ids) { | ||
// collection.find({}, {'fields': ['b']}, function(err, cursor) { | ||
// cursor.toArray(function(err, items) { | ||
// test.equal(("" + regexp), ("" + items[0].b)); | ||
// // Let's close the db | ||
// test.done(); | ||
// }); | ||
// }); | ||
// }); | ||
// }); | ||
// }, | ||
shouldCorrectlyInsertSimpleRegExpDocument : function(test) { | ||
var regexp = /foobar/i; | ||
client.createCollection('test_regex', function(err, collection) { | ||
collection.insert({'b':regexp}, {safe:true}, function(err, ids) { | ||
collection.find({}, {'fields': ['b']}, function(err, cursor) { | ||
cursor.toArray(function(err, items) { | ||
test.equal(("" + regexp), ("" + items[0].b)); | ||
// Let's close the db | ||
test.done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}, | ||
shouldCorrectlyInsertSimpleUTF8Regexp : function(test) { | ||
@@ -64,0 +64,0 @@ var regexp = /foobaré/; |
@@ -0,4 +1,7 @@ | ||
var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false; | ||
var testCase = require('../../deps/nodeunit').testCase, | ||
debug = require('util').debug | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
gleak = require('../../tools/gleak'), | ||
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager, | ||
@@ -12,2 +15,3 @@ Db = require('../../lib/mongodb').Db, | ||
var retries = 120; | ||
var RS = RS == null ? null : RS; | ||
@@ -27,5 +31,15 @@ var ensureConnection = function(test, numberOfTries, callback) { | ||
var db = new Db('integration_test_', replSet); | ||
db.open(function(err, p_db) { | ||
// Print any errors | ||
db.on("error", function(err) { | ||
console.log("============================= ensureConnection caught error") | ||
console.dir(err) | ||
if(err != null && err.stack != null) console.log(err.stack) | ||
db.close(); | ||
}) | ||
// Open the db | ||
db.open(function(err, p_db) { | ||
db.close(); | ||
if(err != null) { | ||
db.close(); | ||
// Wait for a sec and retry | ||
@@ -35,5 +49,5 @@ setTimeout(function() { | ||
ensureConnection(test, numberOfTries, callback); | ||
}, 1000); | ||
}, 3000); | ||
} else { | ||
return callback(null, p_db); | ||
return callback(null); | ||
} | ||
@@ -46,6 +60,6 @@ }) | ||
// Create instance of replicaset manager but only for the first call | ||
if(!serversUp) { | ||
if(!serversUp && !noReplicasetStart) { | ||
serversUp = true; | ||
RS = new ReplicaSetManager({retries:120}); | ||
RS.startSet(true, function(err, result) { | ||
RS = new ReplicaSetManager({retries:120, passive_count:0}); | ||
RS.startSet(true, function(err, result) { | ||
callback(); | ||
@@ -62,4 +76,4 @@ }); | ||
RS.restartKilledNodes(function(err, result) { | ||
callback(); | ||
}) | ||
callback(); | ||
}); | ||
}, | ||
@@ -81,2 +95,3 @@ | ||
test.done(); | ||
p_db.close(); | ||
}) | ||
@@ -98,11 +113,8 @@ }, | ||
db.on('close', function() { ++dbCloseCount; }); | ||
db.serverConfig.servers.forEach(function(server) { | ||
server.connection.on('close', function() { ++serverCloseCount; }); | ||
}); | ||
db.close(); | ||
setTimeout(function() { | ||
test.equal(dbCloseCount, 1); | ||
test.equal(serverCloseCount, db.serverConfig.servers.length); | ||
test.done(); | ||
}, 250); | ||
}, 2000); | ||
}) | ||
@@ -122,13 +134,10 @@ }, | ||
test.equal(null, err); | ||
var dbCloseCount = 0, serverCloseCount = 0; | ||
var dbCloseCount = 0;//, serverCloseCount = 0; | ||
db.on('close', function() { ++dbCloseCount; }); | ||
var connection = db.serverConfig.connection; | ||
db.serverConfig.servers.forEach(function(server) { | ||
server.connection.on('close', function() { ++serverCloseCount; }); | ||
}); | ||
db.close(function() { | ||
// Let all events fire. | ||
process.nextTick(function() { | ||
test.equal(dbCloseCount, 1); | ||
test.equal(serverCloseCount, db.serverConfig.servers.length); | ||
test.equal(dbCloseCount, 0); | ||
// test.equal(serverCloseCount, db.serverConfig.servers.length); | ||
test.done(); | ||
@@ -158,3 +167,10 @@ }); | ||
shouldConnectWithPrimarySteppedDown : function(test) { | ||
// debug("=========================================== shouldConnectWithPrimarySteppedDown") | ||
var replSet = new ReplSetServers( [ | ||
new Server( RS.host, RS.ports[1], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[2], { auto_reconnect: true } ) | ||
], | ||
{rs_name:RS.name} | ||
); | ||
// Step down primary server | ||
@@ -164,8 +180,10 @@ RS.stepDownPrimary(function(err, result) { | ||
ensureConnection(test, retries, function(err, p_db) { | ||
if(err != null) debug("shouldConnectWithPrimarySteppedDown :: " + inspect(err)); | ||
test.ok(err == null); | ||
test.equal(true, p_db.serverConfig.isConnected()); | ||
p_db.close(); | ||
test.done(); | ||
new Db('integration_test_', replSet).open(function(err, p_db) { | ||
test.ok(err == null); | ||
test.equal(true, p_db.serverConfig.isConnected()); | ||
p_db.close(); | ||
test.done(); | ||
}) | ||
}); | ||
@@ -176,3 +194,2 @@ }); | ||
shouldConnectWithThirdNodeKilled : function(test) { | ||
// debug("=========================================== shouldConnectWithThirdNodeKilled") | ||
RS.getNodeFromPort(RS.ports[2], function(err, node) { | ||
@@ -194,8 +211,9 @@ if(err != null) debug("shouldConnectWithThirdNodeKilled :: " + inspect(err)); | ||
ensureConnection(test, retries, function(err, p_db) { | ||
if(err != null) debug("shouldConnectWithThirdNodeKilled :: " + inspect(err)); | ||
test.ok(err == null); | ||
test.equal(true, p_db.serverConfig.isConnected()); | ||
new Db('integration_test_', replSet).open(function(err, p_db) { | ||
test.ok(err == null); | ||
test.equal(true, p_db.serverConfig.isConnected()); | ||
p_db.close(); | ||
test.done(); | ||
p_db.close(); | ||
test.done(); | ||
}) | ||
}); | ||
@@ -207,4 +225,4 @@ }); | ||
shouldConnectWithSecondaryNodeKilled : function(test) { | ||
// debug("=========================================== shouldConnectWithSecondaryNodeKilled") | ||
RS.killSecondary(function(node) { | ||
// Replica configuration | ||
@@ -220,4 +238,11 @@ var replSet = new ReplSetServers( [ | ||
var db = new Db('integration_test_', replSet); | ||
// Print any errors | ||
db.on("error", function(err) { | ||
console.log("============================= caught error") | ||
console.dir(err) | ||
if(err.stack != null) console.log(err.stack) | ||
test.done(); | ||
}) | ||
db.open(function(err, p_db) { | ||
if(err != null) debug("shouldConnectWithSecondaryNodeKilled :: " + inspect(err)); | ||
test.ok(err == null); | ||
@@ -227,3 +252,3 @@ test.equal(true, p_db.serverConfig.isConnected()); | ||
// Close and cleanup | ||
db.close(); | ||
p_db.close(); | ||
test.done(); | ||
@@ -235,3 +260,2 @@ }) | ||
shouldConnectWithPrimaryNodeKilled : function(test) { | ||
// debug("=========================================== shouldConnectWithPrimaryNodeKilled") | ||
RS.killPrimary(function(node) { | ||
@@ -246,13 +270,8 @@ // Replica configuration | ||
); | ||
var db = new Db('integration_test_', replSet); | ||
ensureConnection(test, retries, function(err, p_db) { | ||
if(err != null) debug("shouldConnectWithPrimaryNodeKilled :: " + inspect(err)); | ||
test.ok(err == null); | ||
test.equal(true, p_db.serverConfig.isConnected()); | ||
p_db.close(); | ||
if(err != null && err.stack != null) console.log(err.stack) | ||
test.done(); | ||
}); | ||
// }) | ||
}); | ||
}); | ||
@@ -262,3 +281,2 @@ }, | ||
shouldCorrectlyBeAbleToUsePortAccessors : function(test) { | ||
// debug("=========================================== shouldCorrectlyBeAbleToUsePortAccessors") | ||
// Replica configuration | ||
@@ -279,3 +297,3 @@ var replSet = new ReplSetServers( [ | ||
db.close(); | ||
p_db.close(); | ||
test.done(); | ||
@@ -286,3 +304,2 @@ }) | ||
shouldCorrectlyConnect: function(test) { | ||
// debug("=========================================== shouldCorrectlyConnect") | ||
// Replica configuration | ||
@@ -305,12 +322,12 @@ var replSet = new ReplSetServers( [ | ||
if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err)); | ||
test.notEqual(null, primary); | ||
test.equal(primary, p_db.serverConfig.primary.host + ":" + p_db.serverConfig.primary.port); | ||
test.equal(primary, p_db.serverConfig.primary.host + ":" + p_db.serverConfig.primary.port); | ||
// Perform tests | ||
RS.secondaries(function(err, items) { | ||
if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err)); | ||
// Test if we have the right secondaries | ||
test.deepEqual(items.sort(), p_db.serverConfig.secondaries.map(function(item) { | ||
test.deepEqual(items.sort(), p_db.serverConfig.allSecondaries.map(function(item) { | ||
return item.host + ":" + item.port; | ||
@@ -322,7 +339,6 @@ }).sort()); | ||
if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err)); | ||
test.deepEqual(items.sort(), p_db.serverConfig.arbiters.map(function(item) { | ||
return item.host + ":" + item.port; | ||
}).sort()); | ||
// Force new instance | ||
@@ -332,5 +348,4 @@ var db2 = new Db('integration_test_', replSet ); | ||
if(err != null) debug("shouldCorrectlyConnect :: " + inspect(err)); | ||
test.equal(true, p_db2.serverConfig.isConnected()); | ||
// Close top instance | ||
@@ -346,2 +361,8 @@ db.close(); | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) |
var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false; | ||
var testCase = require('../../deps/nodeunit').testCase, | ||
debug = require('util').debug | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
gleak = require('../../tools/gleak'), | ||
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager, | ||
@@ -14,2 +15,3 @@ Db = require('../../lib/mongodb').Db, | ||
var retries = 120; | ||
var RS = RS == null ? null : RS; | ||
@@ -29,2 +31,11 @@ var ensureConnection = function(test, numberOfTries, callback) { | ||
var db = new Db('integration_test_', replSet); | ||
// Print any errors | ||
db.on("error", function(err) { | ||
console.log("============================= ensureConnection caught error") | ||
console.dir(err) | ||
if(err != null && err.stack != null) console.log(err.stack) | ||
db.close(); | ||
}) | ||
// Open the db | ||
db.open(function(err, p_db) { | ||
@@ -65,5 +76,4 @@ if(err != null) { | ||
RS.restartKilledNodes(function(err, result) { | ||
if(err != null) throw err; | ||
callback(); | ||
}) | ||
callback(); | ||
}); | ||
}, | ||
@@ -85,7 +95,5 @@ | ||
db.open(function(err, p_db) { | ||
// debug("====================================================== 1") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
// if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
// Drop collection on replicaset | ||
p_db.dropCollection('testsets', function(err, r) { | ||
// debug("====================================================== 2") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
@@ -95,3 +103,2 @@ | ||
p_db.createCollection('testsets', function(err, collection) { | ||
// debug("====================================================== 3") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
@@ -101,3 +108,2 @@ | ||
collection.insert({a:20}, {safe: {w:2, wtimeout: 10000}}, function(err, r) { | ||
// debug("====================================================== 4") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
@@ -107,3 +113,2 @@ | ||
collection.count(function(err, c) { | ||
// debug("====================================================== 5") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
@@ -119,3 +124,2 @@ | ||
RS.killPrimary(function(node) { | ||
// debug("====================================================== 6") | ||
@@ -125,3 +129,2 @@ // Ensure valid connection | ||
ensureConnection(test, retries, function(err, p_db) { | ||
// debug("====================================================== 7") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
@@ -133,11 +136,8 @@ | ||
p_db.collection('testsets', function(err, collection) { | ||
// debug("====================================================== 8") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
collection.insert({a:30}, {safe:true}, function(err, r) { | ||
// debug("====================================================== 9") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
collection.insert({a:40}, {safe:true}, function(err, r) { | ||
// debug("====================================================== 10") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
@@ -147,3 +147,2 @@ | ||
collection.count(function(err, c) { | ||
// debug("====================================================== 11") | ||
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err)); | ||
@@ -167,3 +166,9 @@ | ||
}) | ||
} | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) | ||
@@ -170,0 +175,0 @@ |
var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false; | ||
var testCase = require('../../deps/nodeunit').testCase, | ||
debug = require('util').debug | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
gleak = require('../../tools/gleak'), | ||
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager, | ||
@@ -15,6 +16,5 @@ Db = require('../../lib/mongodb').Db, | ||
var retries = 120; | ||
// var RS = null; | ||
var RS = RS == null ? null : RS; | ||
var ensureConnection = function(test, numberOfTries, callback) { | ||
// debug("=========================================== ensureConnection::" + numberOfTries) | ||
// Replica configuration | ||
@@ -32,2 +32,11 @@ var replSet = new ReplSetServers( [ | ||
var db = new Db('integration_test_', replSet); | ||
// Print any errors | ||
db.on("error", function(err) { | ||
console.log("============================= ensureConnection caught error") | ||
console.dir(err) | ||
if(err != null && err.stack != null) console.log(err.stack) | ||
db.close(); | ||
}) | ||
// Open the db | ||
db.open(function(err, p_db) { | ||
@@ -52,7 +61,4 @@ if(err != null) { | ||
serversUp = true; | ||
// RS = new ReplicaSetManager({retries:120, arbiter_count:0, passive_count:1}); | ||
RS = new ReplicaSetManager({retries:120}); | ||
RS = new ReplicaSetManager({retries:120, passive_count:1, arbiter_count:1}); | ||
RS.startSet(true, function(err, result) { | ||
if(err != null) throw err; | ||
// Finish setup | ||
callback(); | ||
@@ -62,5 +68,4 @@ }); | ||
RS.restartKilledNodes(function(err, result) { | ||
if(err != null) throw err; | ||
callback(); | ||
}) | ||
}) | ||
} | ||
@@ -71,7 +76,6 @@ }, | ||
RS.restartKilledNodes(function(err, result) { | ||
if(err != null) throw err; | ||
callback(); | ||
}) | ||
callback(); | ||
}); | ||
}, | ||
shouldCorrectlyWaitForReplicationToServersOnInserts : function(test) { | ||
@@ -102,4 +106,5 @@ // debug("=========================================== shouldWorkCorrectlyWithInserts") | ||
collection.insert({a:20}, {safe: {w:2, wtimeout: 10000}}, function(err, r) { | ||
test.equal(null, err); | ||
test.equal(null, err); | ||
test.done(); | ||
p_db.close(); | ||
}); | ||
@@ -110,4 +115,4 @@ }); | ||
}, | ||
shouldCorrectlyWaitForReplicationToServersOnInserts : function(test) { | ||
shouldCorrectlyThrowTimeoutForReplicationToServersOnInserts : function(test) { | ||
// debug("=========================================== shouldWorkCorrectlyWithInserts") | ||
@@ -140,2 +145,3 @@ // Replica configuration | ||
test.done(); | ||
p_db.close(); | ||
}); | ||
@@ -176,2 +182,3 @@ }); | ||
test.done(); | ||
p_db.close(); | ||
}) | ||
@@ -195,5 +202,20 @@ }); | ||
// Insert some data | ||
var db = new Db('integration_test_', replSet); | ||
// Print any errors | ||
db.on("error", function(err) { | ||
console.log("============================= ensureConnection caught error") | ||
console.dir(err) | ||
if(err != null && err.stack != null) console.log(err.stack) | ||
db.close(); | ||
}) | ||
var first = false; | ||
// Open db | ||
db.open(function(err, p_db) { | ||
if(first) return | ||
first = true | ||
// Check if we got an error | ||
@@ -214,13 +236,22 @@ if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
// Kill the primary | ||
RS.killPrimary(function(node) { | ||
RS.killPrimary(2, {killNodeWaitTime:10}, function(node) { | ||
// Attempt insert (should fail) | ||
collection.insert({a:30}, {safe: {w:2, wtimeout: 10000}}, function(err, r) { | ||
test.ok(err != null) | ||
// test.done(); | ||
if(err != null) { | ||
collection.insert({a:30}, {safe: true}, function(err, r) { | ||
collection.insert({a:40}, {safe: true}, function(err, r) { | ||
// Peform a count | ||
collection.count(function(err, count) { | ||
test.equal(2, count); | ||
p_db.close(); | ||
test.done(); | ||
}); | ||
}); | ||
} else { | ||
console.log("+++++++++++++++++++++++++++++++++++++++++++ FAILURE") | ||
p_db.close(); | ||
test.ok(false) | ||
} | ||
@@ -249,2 +280,10 @@ }); | ||
var db = new Db('integration_test_', replSet); | ||
// Print any errors | ||
db.on("error", function(err) { | ||
console.log("============================= ensureConnection caught error") | ||
console.dir(err) | ||
if(err != null && err.stack != null) console.log(err.stack) | ||
db.close(); | ||
}) | ||
// Open db | ||
db.open(function(err, p_db) { | ||
@@ -266,3 +305,3 @@ // Check if we got an error | ||
// Kill the primary | ||
RS.killPrimary(function(node) { | ||
RS.killPrimary(2, {killNodeWaitTime:1}, function(node) { | ||
// Ok let's execute same query a couple of times | ||
@@ -272,9 +311,3 @@ collection.find({}).toArray(function(err, items) { | ||
// debug(" 1 =============================== err :: " + inspect(err)) | ||
// debug(inspect(items)) | ||
collection.find({}).toArray(function(err, items) { | ||
// debug(" 2 =============================== err :: " + inspect(err)) | ||
// debug(inspect(items)) | ||
test.ok(err == null); | ||
@@ -284,8 +317,5 @@ test.equal(1, items.length); | ||
collection.find({}).toArray(function(err, items) { | ||
// debug(" 2 =============================== err :: " + inspect(err)) | ||
// debug(inspect(items)) | ||
test.ok(err == null); | ||
test.equal(1, items.length); | ||
p_db.close(); | ||
test.done(); | ||
@@ -341,3 +371,2 @@ }); | ||
RS.killPrimary(function(node) { | ||
// Ensure valid connection | ||
@@ -358,32 +387,29 @@ // Do inserts | ||
var group = this.group(); | ||
collection.save({a:30}, {safe:true}, group()); | ||
collection.save({a:40}, {safe:true}, group()); | ||
collection.save({a:50}, {safe:true}, group()); | ||
collection.save({a:60}, {safe:true}, group()); | ||
collection.save({a:70}, {safe:true}, group()); | ||
collection.save({a:30}, {safe:{w:2, wtimeout: 10000}}, group()); | ||
collection.save({a:40}, {safe:{w:2, wtimeout: 10000}}, group()); | ||
collection.save({a:50}, {safe:{w:2, wtimeout: 10000}}, group()); | ||
collection.save({a:60}, {safe:{w:2, wtimeout: 10000}}, group()); | ||
collection.save({a:70}, {safe:{w:2, wtimeout: 10000}}, group()); | ||
}, | ||
function finishUp(err, values) { | ||
function finishUp(err, values) { | ||
// Restart the old master and wait for the sync to happen | ||
RS.restartKilledNodes(function(err, result) { | ||
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
if(err != null) throw err; | ||
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
// Contains the results | ||
var results = []; | ||
// Just wait for the results | ||
setTimeout(function() { | ||
// Ensure the connection | ||
ensureConnection(test, retries, function(err, p_db) { | ||
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
// Get the collection | ||
p_db.collection('testsets', function(err, collection) { | ||
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
collection.find().each(function(err, item) { | ||
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
if(item == null) { | ||
@@ -397,13 +423,13 @@ // Ensure we have the correct values | ||
}); | ||
// Run second check | ||
collection.save({a:80}, {safe:true}, function(err, r) { | ||
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
collection.find().toArray(function(err, items) { | ||
if(err != null) debug("shouldWorkCorrectlyWithInserts :: " + inspect(err)); | ||
// Ensure we have the correct values | ||
test.equal(7, items.length); | ||
[20, 30, 40, 50, 60, 70, 80].forEach(function(a) { | ||
@@ -414,3 +440,3 @@ test.equal(1, items.filter(function(element) { | ||
}); | ||
p_db.close(); | ||
@@ -426,3 +452,3 @@ test.done(); | ||
}); | ||
}, 1000); | ||
}, 5000); | ||
}) | ||
@@ -440,3 +466,9 @@ } | ||
}) | ||
} | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) | ||
@@ -443,0 +475,0 @@ |
var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false; | ||
var testCase = require('../../deps/nodeunit').testCase, | ||
debug = require('util').debug | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
gleak = require('../../tools/gleak'), | ||
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager, | ||
@@ -15,3 +16,3 @@ Db = require('../../lib/mongodb').Db, | ||
var retries = 120; | ||
// var RS = null; | ||
var RS = RS == null ? null : RS; | ||
@@ -25,3 +26,3 @@ var ensureConnection = function(test, numberOfTries, callback) { | ||
], | ||
{rs_name:RS.name, read_secondary:true} | ||
{rs_name:RS.name} | ||
); | ||
@@ -31,3 +32,12 @@ | ||
var db = new Db('ruby-test-db', replSet); | ||
var db = new Db('integration_test_', replSet); | ||
// Print any errors | ||
db.on("error", function(err) { | ||
console.log("============================= ensureConnection caught error") | ||
console.dir(err) | ||
if(err != null && err.stack != null) console.log(err.stack) | ||
db.close(); | ||
}) | ||
// Open the db | ||
db.open(function(err, p_db) { | ||
@@ -52,3 +62,3 @@ if(err != null) { | ||
serversUp = true; | ||
RS = new ReplicaSetManager({retries:120}); | ||
RS = new ReplicaSetManager({retries:120, passive_count:1, arbiter_count:1}); | ||
RS.startSet(true, function(err, result) { | ||
@@ -133,89 +143,34 @@ if(err != null) throw err; | ||
); | ||
// Insert some data | ||
var db = new Db('ruby-test-db', replSet); | ||
var db = new Db('integration_test_', replSet); | ||
db.open(function(err, p_db) { | ||
if(err != null) debug("shouldReadPrimary :: " + inspect(err)); | ||
p_db.createCollection("test-sets", {safe:{w:3, wtimeout:10000}}, function(err, collection) { | ||
p_db.createCollection("testsets", {safe:{w:2, wtimeout:10000}}, function(err, collection) { | ||
if(err != null) debug("shouldReadPrimary :: " + inspect(err)); | ||
Step( | ||
function inserts() { | ||
var group = this.group(); | ||
collection.save({a:20}, group()); | ||
collection.save({a:30}, group()); | ||
collection.save({a:40}, group()); | ||
}, | ||
function done(err, values) { | ||
if(err != null) debug("shouldReadPrimary :: " + inspect(err)); | ||
var results = []; | ||
retryEnsure(60, function(done) { | ||
results = []; | ||
collection.find().each(function(err, item) { | ||
if(item == null) { | ||
var correct = 0; | ||
// Check all the values | ||
var r = [20, 30, 40]; | ||
for(var i = 0; i < r.length; i++) { | ||
correct += results.filter(function(element) { | ||
return element.a == r[i]; | ||
}).length; | ||
} | ||
return correct == 3 ? done(true) : done(false); | ||
} else { | ||
results.push(item); | ||
} | ||
}); | ||
}, function(err, result) { | ||
if(err != null) debug("shouldReadPrimary :: " + inspect(err)); | ||
test.ifError(err); | ||
// Ensure replication happened in time | ||
setTimeout(function() { | ||
// Kill the primary | ||
RS.killPrimary(function(node) { | ||
// | ||
// Retry again to read the docs with primary dead | ||
retryEnsure(60, function(done) { | ||
results = []; | ||
collection.find().each(function(err, item) { | ||
if(err != null) debug("shouldReadPrimary :: " + inspect(err)); | ||
if(item == null) { | ||
var correct = 0; | ||
// Check all the values | ||
var r = [20, 30, 40]; | ||
for(var i = 0; i < r.length; i++) { | ||
correct += results.filter(function(element) { | ||
return element.a == r[i]; | ||
}).length; | ||
} | ||
return correct == 3 ? done(true) : done(false); | ||
} else { | ||
results.push(item); | ||
} | ||
}); | ||
}, function(err, result) { | ||
// Check if we get a correct count | ||
collection.count(function(err, count) { | ||
test.ifError(err); | ||
test.equal(3, count) | ||
test.done(); | ||
p_db.close(); | ||
}); | ||
}) | ||
}); | ||
}, 2000); | ||
}) | ||
} | ||
); | ||
collection.insert([{a:20}, {a:30}, {a:40}], {safe:{w:2, wtimeout:10000}}, function(err, result) { | ||
// Ensure replication happened in time | ||
setTimeout(function() { | ||
// Kill the primary | ||
RS.killPrimary(function(node) { | ||
// Do a collection find | ||
collection.find().toArray(function(err, items) { | ||
test.equal(null, err); | ||
test.equal(3, items.length); | ||
test.done(); | ||
}); | ||
}); | ||
}, 2000); | ||
}) | ||
}); | ||
}) | ||
} | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) | ||
@@ -222,0 +177,0 @@ |
var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false; | ||
var testCase = require('../../deps/nodeunit').testCase, | ||
debug = require('util').debug | ||
debug = require('util').debug, | ||
inspect = require('util').inspect, | ||
gleak = require('../../tools/gleak'), | ||
ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager, | ||
@@ -15,2 +16,3 @@ Db = require('../../lib/mongodb').Db, | ||
var retries = 120; | ||
var RS = RS == null ? null : RS; | ||
@@ -79,3 +81,3 @@ var ensureConnection = function(test, numberOfTries, callback) { | ||
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[2], { auto_reconnect: true } ) | ||
// new Server( RS.host, RS.ports[2], { auto_reconnect: true } ) | ||
], | ||
@@ -99,3 +101,2 @@ {rs_name:RS.name, read_secondary:false} | ||
collection.insert({a:20}, {safe: {w:1, wtimeout: 10000}}, function(err, r) { | ||
// Execute a findAndModify | ||
@@ -112,2 +113,8 @@ collection.findAndModify({'a':20}, [['a', 1]], {'$set':{'b':3}}, {'new':true, safe: {w:7, wtimeout: 10000}}, function(err, updated_doc) { | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) | ||
@@ -114,0 +121,0 @@ |
@@ -7,2 +7,3 @@ var mongodb = process.env['TEST_NATIVE'] != null ? require('../lib/mongodb').native() : require('../lib/mongodb').pure(); | ||
nodeunit = require('../deps/nodeunit'), | ||
gleak = require('../tools/gleak'), | ||
Db = mongodb.Db, | ||
@@ -116,3 +117,9 @@ Cursor = mongodb.Cursor, | ||
}); | ||
}, | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
var leaks = gleak.detectNew(); | ||
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); | ||
test.done(); | ||
} | ||
}) | ||
@@ -119,0 +126,0 @@ |
@@ -29,3 +29,3 @@ var debug = require('util').debug, | ||
this.passiveCount = options["passive_count"] != null ? options["passive_count"] : 1; | ||
this.primaryCount = 1; | ||
this.primaryCount = options["primary_count"] != null ? options["primary_count"] : 1; | ||
this.keyPath = [process.cwd(), "test", "tools", "keyfile.txt"].join("/"); | ||
@@ -39,15 +39,4 @@ fs.chmodSync(this.keyPath, 0600); | ||
// Keeps all the mongod instances | ||
this.mongods = {}; | ||
var self = this; | ||
// Add a handler for errors that bubble up all the way | ||
// process.on('uncaughtException', function (err) { | ||
// debug("============================================================= uncaught Exception") | ||
// debug(inspect(err)) | ||
// // Kill all mongod servers and cleanup before exiting | ||
// self.killAll(function() { | ||
// // Force exit | ||
// process.exit(); | ||
// }) | ||
// }); | ||
} | ||
@@ -78,2 +67,3 @@ | ||
var members = status["members"]; | ||
// Get the correct state memebers | ||
@@ -136,3 +126,2 @@ var nodes = members.filter(function(value) { | ||
// Ensure all the members are up | ||
// process.stdout.write("** Ensuring members are up..."); | ||
debug("** Ensuring members are up..."); | ||
@@ -153,2 +142,3 @@ // Let's ensure everything is up | ||
var self = this; | ||
var done = false; | ||
// Get master connection | ||
@@ -172,4 +162,8 @@ self.getConnection(function(err, connection) { | ||
} else { | ||
self.numberOfInitiateRetries = 0; | ||
callback(null, null); | ||
// Make sure we only do this once, even if some messages are late | ||
if(!done) { | ||
done = true; | ||
self.numberOfInitiateRetries = 0; | ||
callback(null, null); | ||
} | ||
} | ||
@@ -195,2 +189,5 @@ }); | ||
// Set priority off server in config | ||
var priority = typeof fields === 'object' ? fields.priority : null; | ||
// Add extra fields provided | ||
@@ -201,9 +198,4 @@ for(var name in fields) { | ||
// debug("================================================== initNode") | ||
// debug(inspect(this.mongods[n])); | ||
// Perform cleanup of directories | ||
exec("rm -rf " + self.mongods[n]["db_path"], function(err, stdout, stderr) { | ||
// debug("======================================== err1::" + err) | ||
if(err != null) return callback(err, null); | ||
@@ -213,8 +205,3 @@ | ||
exec("mkdir -p " + self.mongods[n]["db_path"], function(err, stdout, stderr) { | ||
// debug("======================================== err2::" + err) | ||
if(err != null) return callback(err, null); | ||
// debug("= ======================================= start1::" + self.mongods[n]["start"]) | ||
self.mongods[n]["start"] = self.startCmd(n); | ||
@@ -224,5 +211,11 @@ self.start(n, function() { | ||
var member = {"_id": n, "host": self.host + ":" + self.mongods[n]["port"]}; | ||
// Set it to arbiter if it's been passed | ||
if(self.mongods[n]['arbiterOnly']) { | ||
member['arbiterOnly'] = true; | ||
} | ||
// Set priority level if it's defined | ||
if(priority != null) { | ||
member['priority'] = priority; | ||
} | ||
// Push member to config | ||
self.config["members"].push(member); | ||
@@ -242,3 +235,3 @@ // Return | ||
ReplicaSetManager.prototype.kill = function(node, signal, callback) { | ||
ReplicaSetManager.prototype.kill = function(node, signal, options, callback) { | ||
var self = this; | ||
@@ -249,2 +242,5 @@ // Unpack callback and variables | ||
signal = args.length ? args.shift() : 2; | ||
options = args.length ? args.shift() : {}; | ||
// kill node wait time | ||
var killNodeWaitTime = options.killNodeWaitTime == null ? self.killNodeWaitTime : options.killNodeWaitTime; | ||
@@ -256,6 +252,6 @@ debug("** Killing node with pid " + this.mongods[node]["pid"] + " at port " + this.mongods[node]['port']); | ||
function (error, stdout, stderr) { | ||
console.log('stdout: ' + stdout); | ||
console.log('stderr: ' + stderr); | ||
debug('stdout: ' + stdout); | ||
debug('stderr: ' + stderr); | ||
if (error !== null) { | ||
console.log('exec error: ' + error); | ||
debug('exec error: ' + error); | ||
} | ||
@@ -265,7 +261,7 @@ | ||
// Wait for 5 seconds to give the server time to die a proper death | ||
setTimeout(callback, self.killNodeWaitTime); | ||
setTimeout(callback, killNodeWaitTime); | ||
}); | ||
} | ||
ReplicaSetManager.prototype.killPrimary = function(signal, callback) { | ||
ReplicaSetManager.prototype.killPrimary = function(signal, options, callback) { | ||
var self = this; | ||
@@ -276,11 +272,17 @@ // Unpack callback and variables | ||
signal = args.length ? args.shift() : 2; | ||
options = args.length ? args.shift() : {}; | ||
var done = false; | ||
this.getNodeWithState(1, function(err, node) { | ||
if(err != null) return callback(err, null); | ||
if(!done) { | ||
// Ensure no double callbacks due to later scheduled connections returning | ||
done = true; | ||
if(err != null) return callback(err, null); | ||
// Kill process and return node reference | ||
self.kill(node, signal, function() { | ||
// Wait for a while before passing back | ||
callback(null, node); | ||
}) | ||
// Kill process and return node reference | ||
self.kill(node, signal, options, function() { | ||
// Wait for a while before passing back | ||
callback(null, node); | ||
}) | ||
} | ||
}); | ||
@@ -291,9 +293,14 @@ } | ||
var self = this; | ||
var done = false; | ||
this.getNodeWithState(2, function(err, node) { | ||
if(err != null) return callback(err, null); | ||
// Kill process and return node reference | ||
self.kill(node, function() { | ||
callback(null, node); | ||
}) | ||
if(!done) { | ||
// Ensure no double callbacks due to later scheduled connections returning | ||
done = true; | ||
if(err != null) return callback(err, null); | ||
// Kill process and return node reference | ||
self.kill(node, function() { | ||
callback(null, node); | ||
}) | ||
} | ||
}); | ||
@@ -304,11 +311,15 @@ } | ||
var self = this; | ||
// Get the primary node | ||
this.getNodeWithState(1, function(err, primary) { | ||
// Return error | ||
if(err) return callback(err, null); | ||
if(primary == null) return callback(new Error("No primary found"), null); | ||
// Get the connection for the primary | ||
self.getConnection(primary, function(err, connection) { | ||
// Return any errors | ||
if(err) return callback(err, null); | ||
// Closes the connection so never gets a response | ||
// Execute stepdown process | ||
connection.admin().command({"replSetStepDown": 90}); | ||
// Call back | ||
return callback(null, null); | ||
// Return the callback | ||
return callback(null, connection); | ||
}); | ||
@@ -352,67 +363,166 @@ }); | ||
var self = this; | ||
var numberOfInitiateRetries = this.retries; | ||
var done = false; | ||
// Write out the ensureUp | ||
// process.stdout.write("."); | ||
if(!self.up) process.stdout.write("."); | ||
// Retry check for server up sleeping inbetween | ||
self.retriedConnects = 0; | ||
// Attemp to retrieve a connection | ||
self.getConnection(function(err, connection) { | ||
// If we have an error or no connection object retry | ||
if(err != null || connection == null) { | ||
setTimeout(function() { | ||
self.ensureUpRetries++; | ||
self.ensureUp(callback); | ||
}, 1000) | ||
// Return | ||
return; | ||
// Actual function doing testing | ||
var ensureUpFunction = function() { | ||
if(!done) { | ||
if(!self.up) process.stdout.write("."); | ||
// Attemp to retrieve a connection | ||
self.getConnection(function(err, connection) { | ||
// Adjust the number of retries | ||
numberOfInitiateRetries = numberOfInitiateRetries - 1 | ||
// If have no more retries stop | ||
if(numberOfInitiateRetries == 0) { | ||
// Set that we are done | ||
done = true; | ||
// perform callback | ||
return callback(new Er=ror("Servers did not come up again"), null); | ||
} | ||
// We have a connection, execute command and update server object | ||
if(err == null && connection != null) { | ||
// Check repl set get status | ||
connection.admin().command({"replSetGetStatus": 1}, function(err, object) { | ||
/// Get documents | ||
var documents = object.documents; | ||
// Get status object | ||
var status = documents[0]; | ||
// If no members set | ||
if(status["members"] == null || err != null) { | ||
// if we have a connection force close it | ||
if(connection != null) connection.close(); | ||
// Ensure we perform enough retries | ||
if(self.ensureUpRetries >= self.retries) { | ||
// Set that we are done | ||
done = true; | ||
// if we have a connection force close it | ||
if(connection != null) connection.close(); | ||
// Return error | ||
return callback(new Error("Operation Failure"), null); | ||
} else { | ||
// Execute function again | ||
setTimeout(ensureUpFunction, 1000); | ||
} | ||
} else { | ||
// Establish all health member | ||
var healthyMembers = status.members.filter(function(element) { | ||
return element["health"] == 1 && [1, 2, 7].indexOf(element["state"]) != -1 | ||
}); | ||
var stateCheck = status["members"].filter(function(element, indexOf, array) { | ||
return element["state"] == 1; | ||
}); | ||
if(healthyMembers.length == status.members.length && stateCheck.length > 0) { | ||
// Set that we are done | ||
done = true; | ||
// if we have a connection force close it | ||
if(connection != null) connection.close(); | ||
// process.stdout.write("all members up! \n\n"); | ||
if(!self.up) process.stdout.write("all members up!\n\n") | ||
self.up = true; | ||
return callback(null, status); | ||
} else { | ||
// if we have a connection force close it | ||
if(connection != null) connection.close(); | ||
// Ensure we perform enough retries | ||
if(self.ensureUpRetries >= self.retries) { | ||
// Set that we are done | ||
done = true; | ||
// if we have a connection force close it | ||
if(connection != null) connection.close(); | ||
// Return error | ||
return callback(new Error("Operation Failure"), null); | ||
} else { | ||
// Execute function again | ||
setTimeout(ensureUpFunction, 1000); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
}); | ||
} | ||
// Check repl set get status | ||
connection.admin().command({"replSetGetStatus": 1}, function(err, object) { | ||
/// Get documents | ||
var documents = object.documents; | ||
// Get status object | ||
var status = documents[0]; | ||
} | ||
// If no members set | ||
if(status["members"] == null || err != null) { | ||
// Ensure we perform enough retries | ||
if(self.ensureUpRetries < self.retries) { | ||
setTimeout(function() { | ||
self.ensureUpRetries++; | ||
self.ensureUp(callback); | ||
}, 1000) | ||
} else { | ||
return callback(new Error("Operation Failure"), null); | ||
} | ||
} else { | ||
// Establish all health member | ||
var healthyMembers = status.members.filter(function(element) { | ||
return element["health"] == 1 && [1, 2, 7].indexOf(element["state"]) != -1 | ||
}); | ||
var stateCheck = status["members"].filter(function(element, indexOf, array) { | ||
return element["state"] == 1; | ||
}); | ||
if(healthyMembers.length == status.members.length && stateCheck.length > 0) { | ||
// process.stdout.write("all members up! \n\n"); | ||
if(!self.up) process.stdout.write("all members up!\n\n") | ||
self.up = true; | ||
return callback(null, status); | ||
} else { | ||
// Ensure we perform enough retries | ||
if(self.ensureUpRetries < self.retries) { | ||
setTimeout(function() { | ||
self.ensureUpRetries++; | ||
self.ensureUp(callback); | ||
}, 1000) | ||
} else { | ||
return callback(new Error("Operation Failure"), null); | ||
} | ||
} | ||
} | ||
}); | ||
}); | ||
// Execute the first function call | ||
ensureUpFunction(); | ||
// // Write out the ensureUp | ||
// // process.stdout.write("."); | ||
// if(!self.up) process.stdout.write("."); | ||
// // Retry check for server up sleeping inbetween | ||
// self.retriedConnects = 0; | ||
// // Attemp to retrieve a connection | ||
// self.getConnection(function(err, connection) { | ||
// // If we have an error or no connection object retry | ||
// if(err != null || connection == null) { | ||
// // if we have a connection force close it | ||
// if(connection != null) connection.close(); | ||
// // Retry the connection | ||
// setTimeout(function() { | ||
// self.ensureUpRetries++; | ||
// self.ensureUp(callback); | ||
// }, 1000) | ||
// // Return | ||
// return; | ||
// } | ||
// | ||
// // Check repl set get status | ||
// connection.admin().command({"replSetGetStatus": 1}, function(err, object) { | ||
// /// Get documents | ||
// var documents = object.documents; | ||
// // Get status object | ||
// var status = documents[0]; | ||
// | ||
// // If no members set | ||
// if(status["members"] == null || err != null) { | ||
// // if we have a connection force close it | ||
// if(connection != null) connection.close(); | ||
// // Ensure we perform enough retries | ||
// if(self.ensureUpRetries < self.retries) { | ||
// setTimeout(function() { | ||
// self.ensureUpRetries++; | ||
// self.ensureUp(callback); | ||
// }, 1000) | ||
// } else { | ||
// // if we have a connection force close it | ||
// if(connection != null) connection.close(); | ||
// // Return error | ||
// return callback(new Error("Operation Failure"), null); | ||
// } | ||
// } else { | ||
// // Establish all health member | ||
// var healthyMembers = status.members.filter(function(element) { | ||
// return element["health"] == 1 && [1, 2, 7].indexOf(element["state"]) != -1 | ||
// }); | ||
// | ||
// var stateCheck = status["members"].filter(function(element, indexOf, array) { | ||
// return element["state"] == 1; | ||
// }); | ||
// | ||
// if(healthyMembers.length == status.members.length && stateCheck.length > 0) { | ||
// // if we have a connection force close it | ||
// if(connection != null) connection.close(); | ||
// // process.stdout.write("all members up! \n\n"); | ||
// if(!self.up) process.stdout.write("all members up!\n\n") | ||
// self.up = true; | ||
// return callback(null, status); | ||
// } else { | ||
// // if we have a connection force close it | ||
// if(connection != null) connection.close(); | ||
// // Ensure we perform enough retries | ||
// if(self.ensureUpRetries < self.retries) { | ||
// setTimeout(function() { | ||
// self.ensureUpRetries++; | ||
// self.ensureUp(callback); | ||
// }, 1000) | ||
// } else { | ||
// return callback(new Error("Operation Failure"), null); | ||
// } | ||
// } | ||
// } | ||
// }); | ||
// }); | ||
} | ||
@@ -428,16 +538,17 @@ | ||
Step( | ||
// Start all nodes | ||
function start() { | ||
var group = this.group(); | ||
// Start all nodes | ||
for(var i = 0; i < nodes.length; i++) { | ||
self.start(nodes[i], group()); | ||
var numberOfNodes = nodes.length; | ||
if(numberOfNodes == 0) return self.ensureUp(callback); | ||
// Restart all the number of nodes | ||
for(var i = 0; i < numberOfNodes; i++) { | ||
// Start the process | ||
self.start(nodes[i], function(err, result) { | ||
// Adjust the number of nodes we are starting | ||
numberOfNodes = numberOfNodes - 1; | ||
if(numberOfNodes === 0) { | ||
self.ensureUp(callback); | ||
} | ||
}, | ||
function finished() { | ||
self.ensureUp(callback); | ||
} | ||
) | ||
}); | ||
} | ||
} | ||
@@ -447,2 +558,6 @@ | ||
var self = this; | ||
// Function done | ||
var done = false; | ||
// Number of retries | ||
var numberOfRetries = self.retries; | ||
// Unpack callback and variables | ||
@@ -453,3 +568,3 @@ var args = Array.prototype.slice.call(arguments, 0); | ||
if(node == null) { | ||
if(node == null) { | ||
var keys = Object.keys(this.mongods); | ||
@@ -465,22 +580,35 @@ for(var i = 0; i < keys.length; i++) { | ||
} | ||
// Fire up the connection to check if we are running | ||
// var db = new Db('node-mongo-blog', new Server(host, port, {}), {native_parser:true}); | ||
var connection = new Db("replicaset_test", new Server(this.host, this.mongods[node]["port"], {})); | ||
connection.open(function(err, connection) { | ||
// We need to retry if we have not finished up the number of retries | ||
if(err != null && self.retriedConnects < self.retries) { | ||
// Sleep for a second then retry | ||
setTimeout(function() { | ||
// Update retries | ||
self.retriedConnects++; | ||
// Perform anothe reconnect | ||
self.getConnection(node, callback); | ||
}, 1000) | ||
} else if(err != null && self.retriedConnects >= self.retries){ | ||
callback(new Error("Failed to reconnect"), null); | ||
} else { | ||
callback(null, connection); | ||
} | ||
}) | ||
// Get the node | ||
if(self.mongods[node] != null) { | ||
var intervalId = setInterval(function() { | ||
var connection = new Db("replicaset_test", new Server(self.host, self.mongods[node]["port"], {})); | ||
connection.open(function(err, db) { | ||
if(err == null && !done) { | ||
// Set done | ||
done = true; | ||
// Clear interval | ||
clearInterval(intervalId); | ||
// Callback as done | ||
return callback(null, connection); | ||
} else { | ||
// Close the connection | ||
if(connection != null) connection.close(); | ||
// Adjust the number of retries | ||
numberOfRetries = numberOfRetries - 1; | ||
// If we have no more retries fail | ||
if(numberOfRetries == 0) { | ||
// Set done | ||
done = true; | ||
// Clear interval | ||
clearInterval(intervalId); | ||
// Callback as done | ||
return callback(new Error("Timed out connecting to primary"), null); | ||
} | ||
} | ||
}); | ||
}, 1000); | ||
} else { | ||
callback(new Error("no primary node found to do stepDownPrimary"), null); | ||
} | ||
} | ||
@@ -491,15 +619,13 @@ | ||
var self = this; | ||
// Start up mongod process | ||
// debug("======================================================================================= starting process") | ||
// debug(self.mongods[node]["start"]) | ||
var mongodb = exec(self.mongods[node]["start"], | ||
function (error, stdout, stderr) { | ||
console.log('stdout: ' + stdout); | ||
console.log('stderr: ' + stderr); | ||
debug('stdout: ' + stdout); | ||
debug('stderr: ' + stderr); | ||
if (error !== null) { | ||
console.log('exec error: ' + error); | ||
debug('exec error: ' + error); | ||
} | ||
}); | ||
}); | ||
// Wait for a half a second then save the pids | ||
@@ -506,0 +632,0 @@ setTimeout(function() { |
@@ -37,2 +37,5 @@ var debug = require('util').debug, | ||
db_path: self.db_path, port: self.port, durable: self.durable, auth:self.auth}); | ||
// console.log("----------------------------------------------------------------------- start") | ||
// console.log(startCmd) | ||
@@ -122,3 +125,3 @@ exec(killall ? 'killall mongod' : '', function(err, stdout, stderr) { | ||
// Create boot command | ||
var startCmd = "mongod --logpath '" + options['log_path'] + "' " + | ||
var startCmd = "mongod --noprealloc --logpath '" + options['log_path'] + "' " + | ||
" --dbpath " + options['db_path'] + " --port " + options['port'] + " --fork"; | ||
@@ -125,0 +128,0 @@ startCmd = options['durable'] ? startCmd + " --dur" : startCmd; |
@@ -17,2 +17,3 @@ var nodeunit = require('../deps/nodeunit'), | ||
{dir: __dirname + "/../test/gridstore", path: "/test/gridstore/"}, | ||
{dir: __dirname + "/../test/connection", path: "/test/connection/"}, | ||
{dir: __dirname + "/../test/bson", path: "/test/bson/"}]; | ||
@@ -19,0 +20,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
1766115
209
28089
12
185