Comparing version 0.9.7-2 to 0.9.7-2-1
@@ -15,3 +15,3 @@ GLOBAL.DEBUG = true; | ||
db.open(function(err, db) { | ||
db.dropDatabase(function(err, result){ | ||
db.dropDatabase(function(err, result){ | ||
db.dropCollection('test', function(err, result) { | ||
@@ -18,0 +18,0 @@ db.createCollection('test', function(err, collection) { |
@@ -84,3 +84,3 @@ /** | ||
for (var index = 0, len = this.id.length; index < len; index++) { | ||
value = BinaryParser.toByte(this.id.substr(index, 1)); | ||
value = BinaryParser.toByte(this.id[index]); | ||
number = value <= 15 | ||
@@ -87,0 +87,0 @@ ? '0' + value.toString(16) |
@@ -68,3 +68,2 @@ var Connection = require('./connection').Connection, | ||
Server.prototype.close = function(callback) { | ||
// console.log("============================================================ server.close") | ||
// Remove all local listeners | ||
@@ -86,8 +85,3 @@ this.removeAllListeners(); | ||
Server.prototype.send = function(command) { | ||
// this.internalConnection.send(command); | ||
} | ||
Server.prototype.isConnected = function() { | ||
// console.log("-------------------------- isConnected :: " + this.connectionPool.isConnected()) | ||
return this.connectionPool && this.connectionPool.isConnected(); | ||
@@ -101,3 +95,2 @@ } | ||
Server.prototype.connect = function(dbInstance, options, callback) { | ||
// console.log("==================================================== connecting to :: " + this.host + ":" + this.port) | ||
if('function' === typeof options) callback = options, options = {}; | ||
@@ -119,5 +112,2 @@ if(options == null) options = {}; | ||
var server = this; | ||
// console.log("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") | ||
// console.log(options.eventReceiver) | ||
// Let's us override the main receiver of events | ||
@@ -130,5 +120,2 @@ var eventReceiver = options.eventReceiver != null ? options.eventReceiver : this; | ||
// Ensure current connection pool is closed if there is one | ||
// if(this.connectionPool != null) this.connectionPool.stop(); | ||
// Set server state to connecting | ||
@@ -138,8 +125,2 @@ this._serverState = 'connecting'; | ||
dbInstance.slaveOk = this.slaveOk ? this.slaveOk : dbInstance.slaveOk; | ||
// If we have an existing connection pool remove it | ||
// if(server.connectionPool != null) { | ||
// console.log("============================================= stopping connection pool") | ||
// server.removeAllEventListeners(); | ||
// server.connectionPool.stop(); | ||
// } | ||
// Create connection Pool instance with the current BSON serializer | ||
@@ -182,3 +163,2 @@ var connectionPool = new ConnectionPool(this.host, this.port, this.poolSize, dbInstance.bson_deserializer, this.socketOptions); | ||
connectionPool.on("poolReady", function() { | ||
// console.log("===================================== server:poolReady :: ") | ||
// Create db command and Add the callback to the list of callbacks by the request id (mapping outgoing messages to correct callbacks) | ||
@@ -200,30 +180,26 @@ var db_command = DbCommand.NcreateIsMasterCommand(dbInstance, dbInstance.databaseName); | ||
connectionPool.on("message", function(message) { | ||
// Do this in a process tick | ||
process.nextTick(function() { | ||
// Attempt to parse the message | ||
try { | ||
// Create a new mongo reply | ||
var mongoReply = new MongoReply() | ||
// Parse the header | ||
mongoReply.parseHeader(message, connectionPool.bson) | ||
// If message size is not the same as the buffer size | ||
// something went terribly wrong somewhere | ||
if(mongoReply.messageLength != message.length) { | ||
// // Force close the pool | ||
// if(connectionPool.isConnected()) server.close(); | ||
// Emit the error | ||
eventReceiver.emit("error", new Error("bson length is different from message length"), server); | ||
// Remove all listeners | ||
server.removeAllListeners(); | ||
} else { | ||
// Attempt to locate a callback instance | ||
for(var i = 0; i < server.dbInstances.length; i++) { | ||
var dbInstanceObject = server.dbInstances[i]; | ||
// Locate the callback info | ||
var callbackInfo = dbInstanceObject._findHandler(mongoReply.responseTo.toString()); | ||
// Attempt to parse the message | ||
try { | ||
// Create a new mongo reply | ||
var mongoReply = new MongoReply() | ||
// Parse the header | ||
mongoReply.parseHeader(message, connectionPool.bson) | ||
// If message size is not the same as the buffer size | ||
// something went terribly wrong somewhere | ||
if(mongoReply.messageLength != message.length) { | ||
// Emit the error | ||
eventReceiver.emit("error", new Error("bson length is different from message length"), server); | ||
// Remove all listeners | ||
server.removeAllListeners(); | ||
} else { | ||
// Attempt to locate a callback instance | ||
for(var i = 0; i < server.dbInstances.length; i++) { | ||
var dbInstanceObject = server.dbInstances[i]; | ||
// Locate the callback info | ||
var callbackInfo = dbInstanceObject._findHandler(mongoReply.responseTo.toString()); | ||
// Only execute callback if we have a caller | ||
if(typeof callbackInfo.callback === 'function' && Array.isArray(callbackInfo.info.chained)) { | ||
// Parse the body | ||
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw); | ||
// Only execute callback if we have a caller | ||
if(typeof callbackInfo.callback === 'function' && Array.isArray(callbackInfo.info.chained)) { | ||
// Parse the body | ||
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw, function(err) { | ||
// Get the callback instance | ||
@@ -248,3 +224,3 @@ var callbackInstance = dbInstanceObject._removeHandler(mongoReply.responseTo); | ||
var foundChainedMethod = false; | ||
// If we have more chained Commands get the next one and add the results to it | ||
@@ -262,3 +238,3 @@ for(var i = 0; i < chainedIds.length; i++) { | ||
} | ||
// If we don't have any more chained methods we should be done and should return the | ||
@@ -270,6 +246,7 @@ // first result of the chain of methods | ||
} | ||
} | ||
} else if(typeof callbackInfo.callback === 'function') { | ||
// Parse the body | ||
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw); | ||
} | ||
}); | ||
} else if(typeof callbackInfo.callback === 'function') { | ||
// Parse the body | ||
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw, function(err) { | ||
// Get the callback instance | ||
@@ -283,13 +260,15 @@ var callbackInstance = dbInstanceObject._removeHandler(mongoReply.responseTo); | ||
} | ||
// Trigger the callback | ||
callbackInstance.callback(null, mongoReply, callbackInstance.info.connection); | ||
} | ||
callbackInstance.callback(null, mongoReply, callbackInstance.info.connection); | ||
}); | ||
} | ||
} | ||
} catch (err) { | ||
// Rethrow error | ||
throw err | ||
} | ||
}) | ||
} | ||
} | ||
} catch (err) { | ||
// Throw error in next tick | ||
process.nextTick(function() { | ||
throw err; | ||
}) | ||
} | ||
}); | ||
@@ -299,7 +278,4 @@ | ||
connectionPool.on("timeout", function(err) { | ||
console.log("===================================== server:timeout :: ") | ||
// Force close the pool | ||
connectionPool.stop(); | ||
// Set timeout | ||
// server._timeout = true; | ||
// Keep the db we don't want to emit an error on | ||
@@ -310,3 +286,2 @@ var filterDb = null; | ||
if(typeof callback === 'function') { | ||
// console.log("===================================== server:timeout :: 0") | ||
// ensure no callbacks get called twice | ||
@@ -316,29 +291,7 @@ var internalCallback = callback; | ||
// Perform callback | ||
// process.nextTick(function() { | ||
internalCallback(err, null); | ||
// }); | ||
internalCallback(err, null); | ||
// Emit errors but filter out current db | ||
filterDb = server.dbInstance; | ||
// } else { | ||
// console.log("===================================== server:timeout :: 1") | ||
// Emit timeout event | ||
// if(eventEmitterIsDb) { | ||
// // console.log("===================================== server:timeout :: 1:1") | ||
// // Issue close across all the db instances registered in server instance | ||
// for(var i = 0; i < server.dbInstances.length; i++) { | ||
// // process.nextTick(function() { | ||
// server.dbInstances[i].emit("timeout", err, server); | ||
// // }) | ||
// } | ||
// // Remove all listeners | ||
// server.removeAllListeners() | ||
// } else { | ||
// console.log("===================================== server:timeout :: 1:2") | ||
// console.dir(eventReceiver) | ||
// process.nextTick(function() { | ||
// eventReceiver.emit("timeout", err, server); | ||
// }); | ||
// Remove all listeners | ||
server.removeAllListeners() | ||
// } | ||
// Remove all listeners | ||
server.removeAllListeners() | ||
} | ||
@@ -352,5 +305,2 @@ | ||
connectionPool.on("error", function(message) { | ||
// console.log("===================================== server:error :: " + message) | ||
// console.dir(message); | ||
// console.log(callback) | ||
// Remove all connectionPool Listeners | ||
@@ -369,4 +319,2 @@ connectionPool.removeAllEventListeners(); | ||
if(typeof callback === 'function') { | ||
// console.log("===================================== server:error:1") | ||
// console.log(callback.toString()) | ||
// Set server state to connected | ||
@@ -382,22 +330,2 @@ server._serverState = 'disconnected'; | ||
} else if(server._serverState !== 'disconnected') { | ||
// console.log("===================================== server:error:2") | ||
// Emit event | ||
// if(eventEmitterIsDb) { | ||
// // console.log("===================================== eventEmitterIsDb = " + eventEmitterIsDb) | ||
// // console.dir(eventReceiver) | ||
// | ||
// // console.log("===================================== server:error:3") | ||
// // Issue error across all the db instances registered in server instance | ||
// if(server._serverState != 'disconnected') { | ||
// for(var i = 0; i < server.dbInstances.length; i++) { | ||
// server.dbInstances[i].emit("error", new Error(message.err), server); | ||
// } | ||
// } | ||
// } else { | ||
// console.log("===================================== server:error:4") | ||
// if(server._serverState != 'disconnected') { | ||
// eventReceiver.emit("error", new Error(message.err), server); | ||
// } | ||
// } | ||
// Set server instance to disconnected state | ||
@@ -413,3 +341,2 @@ server._serverState = !connectionPool.isConnected() ? 'disconnected' : server._serverState; | ||
connectionPool.on("close", function() { | ||
// console.log("===================================== server:close :: " + (typeof callback === 'function')) | ||
// Force close the pool | ||
@@ -439,13 +366,6 @@ connectionPool.stop(); | ||
connectionPool.on("parseError", function(message) { | ||
// console.log("===================================== server:parseError") | ||
// Force close the pool | ||
// if(connectionPool.isConnected()) server.close(); | ||
// Force close the pool | ||
connectionPool.stop(); | ||
// Emit error across all servers | ||
_emitAcrossAllDbInstances(server, null, "error", message, server); | ||
// Emit error | ||
// server.emit("error", message, server); | ||
}); | ||
@@ -452,0 +372,0 @@ |
@@ -41,20 +41,77 @@ var Long = require('../goog/math/long').Long, | ||
MongoReply.prototype.parseBody = function(binary_reply, bson, raw) { | ||
MongoReply.prototype.parseBody = function(binary_reply, bson, raw, callback) { | ||
raw = raw == null ? false : raw; | ||
// Let's unpack all the bson document, deserialize them and store them | ||
for(var object_index = 0; object_index < this.numberReturned; object_index++) { | ||
// Read the size of the bson object | ||
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
var docLimitSize = 1024*10; | ||
// If our message length is very long, let's switch to process.nextTick for messages | ||
if(this.messageLength > docLimitSize) { | ||
var batches = 1; | ||
var batchSize = this.numberReturned; | ||
var overflow = 0; | ||
// If we are storing the raw responses to pipe straight through | ||
if(raw) { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize)); | ||
} else { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(bson.BSON.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize))); | ||
} | ||
// Adjust binary index to point to next block of binary bson data | ||
this.index = this.index + bsonObjectSize; | ||
} | ||
// Just walk down until we get a positive number >= 1 | ||
for(var i = 50; i > 0; i--) { | ||
if((this.numberReturned/i) >= 1) { | ||
batchSize = i; | ||
batches = Math.floor(this.numberReturned/i); | ||
overflow = this.numberReturned%i; | ||
break; | ||
} | ||
} | ||
// Actual main creator of the processFunction setting internal state to control the flow | ||
var parseFunction = function(_self, _binary_reply, _batchSize, _numberReturned) { | ||
var object_index = 0; | ||
// Internal loop process that will use nextTick to ensure we yield some time | ||
var processFunction = function() { | ||
// Iterate over the batch | ||
for(var i = 0; i < _batchSize; i++) { | ||
// Update number of docs parsed | ||
object_index = object_index + 1; | ||
if(object_index <= _numberReturned) { | ||
// Read the size of the bson object | ||
var bsonObjectSize = _binary_reply[_self.index] | _binary_reply[_self.index + 1] << 8 | _binary_reply[_self.index + 2] << 16 | _binary_reply[_self.index + 3] << 24; | ||
// If we are storing the raw responses to pipe straight through | ||
if(raw) { | ||
// Deserialize the object and add to the documents array | ||
_self.documents.push(binary_reply.slice(_self.index, _self.index + bsonObjectSize)); | ||
} else { | ||
// Deserialize the object and add to the documents array | ||
_self.documents.push(bson.BSON.deserialize(binary_reply.slice(_self.index, _self.index + bsonObjectSize))); | ||
} | ||
// Adjust binary index to point to next block of binary bson data | ||
_self.index = _self.index + bsonObjectSize; | ||
} | ||
} | ||
// If we hav more documents process NextTick | ||
if(object_index < _numberReturned) { | ||
process.nextTick(processFunction); | ||
} else { | ||
callback(null); | ||
} | ||
} | ||
// Return the process function | ||
return processFunction; | ||
}(this, binary_reply, batchSize, this.numberReturned)(); | ||
} else { | ||
// Let's unpack all the bson documents, deserialize them and store them | ||
for(var object_index = 0; object_index < this.numberReturned; object_index++) { | ||
// Read the size of the bson object | ||
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24; | ||
// If we are storing the raw responses to pipe straight through | ||
if(raw) { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize)); | ||
} else { | ||
// Deserialize the object and add to the documents array | ||
this.documents.push(bson.BSON.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize))); | ||
} | ||
// Adjust binary index to point to next block of binary bson data | ||
this.index = this.index + bsonObjectSize; | ||
} | ||
callback(null); | ||
} | ||
} | ||
@@ -61,0 +118,0 @@ |
{ "name" : "mongodb" | ||
, "description" : "A node.js driver for MongoDB" | ||
, "keywords" : ["mongodb", "mongo", "driver", "db"] | ||
, "version" : "0.9.7-2" | ||
, "version" : "0.9.7-2-1" | ||
, "author" : "Christian Amor Kvalheim <christkv@gmail.com>" | ||
@@ -6,0 +6,0 @@ , "contributors" : [ "Aaron Heckmann", |
@@ -951,3 +951,3 @@ var testCase = require('../deps/nodeunit').testCase, | ||
}, | ||
shouldCorrectlyExecuteEnsureIndexWithNoCallback : function(test) { | ||
@@ -960,3 +960,3 @@ var docs = []; | ||
} | ||
// Create collection | ||
@@ -969,3 +969,3 @@ client.createCollection('shouldCorrectlyExecuteEnsureIndexWithNoCallback', function(err, collection) { | ||
test.equal(null, err); | ||
// Find with sort | ||
@@ -988,3 +988,3 @@ collection.find().sort(['createdAt', 'asc']).toArray(function(err, items) { | ||
} | ||
// Create collection | ||
@@ -999,3 +999,3 @@ client.createCollection('shouldCorrectlyInsert5000RecordsWithDateAndSortCorrectlyWithIndex', function(err, collection) { | ||
test.equal(null, err); | ||
// Find with sort | ||
@@ -1019,3 +1019,3 @@ collection.find().sort(['createdAt', 'asc']).toArray(function(err, items) { | ||
} | ||
// Create collection | ||
@@ -1045,2 +1045,38 @@ client.createCollection('Should_correctly_rewind_and_restart_cursor', function(err, collection) { | ||
'Should correctly execute count on cursor' : function(test) { | ||
var docs = []; | ||
for(var i = 0; i < 1000; i++) { | ||
var d = new Date().getTime() + i*1000; | ||
docs[i] = {'a':i, createdAt:new Date(d)}; | ||
} | ||
// Create collection | ||
client.createCollection('Should_correctly_execute_count_on_cursor', function(err, collection) { | ||
test.equal(null, err); | ||
// insert all docs | ||
collection.insert(docs, {safe:true}, function(err, result) { | ||
test.equal(null, err); | ||
var total = 0; | ||
// Create a cursor for the content | ||
var cursor = collection.find({}); | ||
cursor.count(function(err, c) { | ||
// Ensure each returns all documents | ||
cursor.each(function(err, item) { | ||
if(item != null) { | ||
total++; | ||
} else { | ||
cursor.count(function(err, c) { | ||
test.equal(1000, c); | ||
test.equal(1000, total); | ||
test.done(); | ||
}) | ||
} | ||
}); | ||
}) | ||
}) | ||
}); | ||
}, | ||
// run this last | ||
@@ -1047,0 +1083,0 @@ noGlobalsLeaked: function(test) { |
@@ -169,3 +169,32 @@ var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false; | ||
}, | ||
shouldCorrectlyQuerySecondaries : function(test) { | ||
// debug("=========================================== shouldCorrectlyQuerySecondaries") | ||
var self = this; | ||
// Replica configuration | ||
var replSet = new ReplSetServers( [ | ||
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[1], { auto_reconnect: true } ), | ||
new Server( RS.host, RS.ports[2], { auto_reconnect: true } ), | ||
], | ||
{rs_name:RS.name, read_secondary:false} | ||
); | ||
// Insert some data | ||
var db = new Db('integration_test_', replSet); | ||
db.open(function(err, p_db) { | ||
if(err != null) debug("shouldReadPrimary :: " + inspect(err)); | ||
// Ensure the checkoutReader gives us the actual writer object | ||
var reader = replSet.checkoutReader(); | ||
var writer = replSet.checkoutWriter(); | ||
// Ensure the connections are the same | ||
test.equal(reader.socketOptions.host, writer.socketOptions.host); | ||
test.equal(reader.socketOptions.port, writer.socketOptions.port); | ||
// Close connection to Spain | ||
db.close(); | ||
test.done(); | ||
}) | ||
}, | ||
noGlobalsLeaked : function(test) { | ||
@@ -172,0 +201,0 @@ var leaks = gleak.detectNew(); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1892257
222
30777
226
17