mongodb-core
Advanced tools
Comparing version 1.3.7 to 1.3.8
@@ -0,1 +1,6 @@ | ||
1.3.8 2016-03-18 | ||
---------------- | ||
- Implements the SDAM monitoring specification. | ||
- Fix issue where cursor would error out and not be buffered when primary is not connected. | ||
1.3.7 2016-03-16 | ||
@@ -2,0 +7,0 @@ ---------------- |
@@ -74,5 +74,65 @@ "use strict"; | ||
, index: 0 | ||
, topologyDescription: null | ||
} | ||
} | ||
/** | ||
* Emit event if it exists | ||
* @method | ||
*/ | ||
function emitSDAMEvent(self, event, description) { | ||
if(self.listeners(event).length > 0) { | ||
self.emit(event, description); | ||
} | ||
} | ||
/** | ||
* Is there a secondary connected | ||
* @method | ||
* @return {boolean} | ||
*/ | ||
State.prototype.resetDescription = function() { | ||
this.s.topologyDescription = { | ||
"topologyType": "Sharded", | ||
"servers": [] | ||
} | ||
} | ||
function emitTopologyDescriptionChanged(self, state) { | ||
if(self.listeners('topologyDescriptionChanged').length > 0 && state) { | ||
var state = state.s; | ||
// Generate description | ||
var description = { | ||
topologyType: 'Sharded', | ||
servers: [] | ||
} | ||
// Add all the secondaries | ||
description.servers = description.servers.concat(state.connectedServers.map(function(x) { | ||
var description = x.getDescription(); | ||
description.type = 'Mongos'; | ||
return description; | ||
})); | ||
description.servers = description.servers.concat(state.disconnectedServers.map(function(x) { | ||
var description = x.getDescription(); | ||
description.type = 'Unknown'; | ||
return description; | ||
})); | ||
// Create the result | ||
var result = { | ||
topologyId: self.id, | ||
previousDescription: state.topologyDescription, | ||
newDescription: description | ||
}; | ||
// Emit the topologyDescription change | ||
self.emit('topologyDescriptionChanged', result); | ||
// Set the new description | ||
state.topologyDescription = description; | ||
} | ||
} | ||
// | ||
@@ -421,2 +481,6 @@ // A Mongos connected | ||
opts.emitError = true; | ||
// Set that server is in a topology | ||
opts.inTopology = true; | ||
opts.topologyId = self.s.id; | ||
opts.monitoring = true; | ||
// Create a new Server | ||
@@ -426,2 +490,8 @@ self.s.mongosState.disconnected(new Server(opts)); | ||
// Reset the replState | ||
this.s.mongosState.resetDescription(); | ||
// Emit the topology opening event | ||
emitSDAMEvent(this, 'topologyOpening', { topologyId: this.s.id }); | ||
// Get the disconnected servers | ||
@@ -439,3 +509,5 @@ var servers = self.s.mongosState.disconnectedServers(); | ||
// Remove any non used handlers | ||
['error', 'close', 'timeout', 'connect', 'message', 'parseError'].forEach(function(e) { | ||
['error', 'close', 'timeout', 'connect', 'message', 'parseError', | ||
'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', 'serverHearbeatFailed', 'serverClosed'].forEach(function(e) { | ||
server.removeAllListeners(e); | ||
@@ -451,5 +523,21 @@ }); | ||
// SDAM Monitoring events | ||
server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); | ||
server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); | ||
server.on('serverHeartbeatStarted', function(e) { self.emit('serverHeartbeatStarted', e); }); | ||
server.on('serverHeartbeatSucceeded', function(e) { self.emit('serverHeartbeatSucceeded', e); }); | ||
server.on('serverHearbeatFailed', function(e) { self.emit('serverHearbeatFailed', e); }); | ||
server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); | ||
if(self.s.logger.isInfo()) self.s.logger.info(f('connecting to server %s', server.name)); | ||
// Attempt to connect | ||
server.connect(); | ||
// Execute the connect | ||
var execute = function(_server) { | ||
process.nextTick(function() { | ||
_server.connect(); | ||
}); | ||
} | ||
// Connect | ||
execute(server); | ||
} | ||
@@ -476,2 +564,4 @@ } | ||
this.s.state = DESTROYED; | ||
// Emit toplogy closing event | ||
emitSDAMEvent(this, 'topologyClosed', { topologyId: this.s.id }); | ||
// Emit close | ||
@@ -879,2 +969,5 @@ if(emitClose && self.listeners('close').length > 0) self.emit('close', self); | ||
// Emit topology changed event | ||
emitTopologyDescriptionChanged(self, state.mongosState); | ||
// Signal server left | ||
@@ -896,2 +989,5 @@ self.emit('left', 'mongos', server); | ||
// Emit topology changed event | ||
emitTopologyDescriptionChanged(self, state.mongosState); | ||
// Signal server left | ||
@@ -912,2 +1008,5 @@ self.emit('left', 'mongos', server); | ||
// Emit topology changed event | ||
emitTopologyDescriptionChanged(self, state.mongosState); | ||
// Signal server left | ||
@@ -954,3 +1053,3 @@ self.emit('left', 'mongos', server); | ||
state.fullsetup = true; | ||
self.emit('fullsetup'); | ||
self.emit('fullsetup', self); | ||
} | ||
@@ -963,5 +1062,8 @@ | ||
state.all = true; | ||
self.emit('all'); | ||
self.emit('all', self); | ||
} | ||
// Emit topology changed event | ||
emitTopologyDescriptionChanged(self, state.mongosState); | ||
// Set connected | ||
@@ -968,0 +1070,0 @@ if(state.state == DISCONNECTED) { |
@@ -39,2 +39,4 @@ "use strict"; | ||
this.secondaryOnlyConnectionAllowed = options.secondaryOnlyConnectionAllowed; | ||
// Description of the Replicaset | ||
this.replicasetDescription = null; | ||
} | ||
@@ -47,2 +49,74 @@ | ||
*/ | ||
State.prototype.resetDescription = function() { | ||
this.replicasetDescription = { | ||
"topologyType": "Unknown", | ||
"servers": [] | ||
} | ||
} | ||
function emitTopologyDescriptionChanged(self) { | ||
if(self.replSet.listeners('topologyDescriptionChanged').length > 0) { | ||
var topology = 'Unknown'; | ||
var setName = self.setName; | ||
if(self.isPrimaryConnected() && self.isSecondaryConnected()) { | ||
topology = 'ReplicaSetWithPrimary'; | ||
} else if(!self.isPrimaryConnected() && self.isSecondaryConnected()) { | ||
topology = 'ReplicaSetNoPrimary'; | ||
} | ||
// Generate description | ||
var description = { | ||
topologyType: topology, | ||
setName: setName, | ||
servers: [] | ||
} | ||
// Add the primary to the list | ||
if(self.isPrimaryConnected()) { | ||
var desc = self.primary.getDescription(); | ||
desc.type = 'RSPrimary'; | ||
description.servers.push(desc); | ||
} | ||
// Add all the secondaries | ||
description.servers = description.servers.concat(self.secondaries.map(function(x) { | ||
var description = x.getDescription(); | ||
description.type = 'RSSecondary'; | ||
return description; | ||
})); | ||
// Add all the arbiters | ||
description.servers = description.servers.concat(self.arbiters.map(function(x) { | ||
var description = x.getDescription(); | ||
return description; | ||
})); | ||
// Add all the passives | ||
description.servers = description.servers.concat(self.passives.map(function(x) { | ||
var description = x.getDescription(); | ||
description.type = 'RSSecondary'; | ||
return description; | ||
})); | ||
// Create the result | ||
var result = { | ||
topologyId: self.id, | ||
previousDescription: self.replicasetDescription, | ||
newDescription: description | ||
}; | ||
// Emit the topologyDescription change | ||
self.replSet.emit('topologyDescriptionChanged', result); | ||
// Set the new description | ||
self.replicasetDescription = description; | ||
} | ||
} | ||
/** | ||
* Is there a secondary connected | ||
* @method | ||
* @return {boolean} | ||
*/ | ||
State.prototype.isSecondaryConnected = function() { | ||
@@ -338,3 +412,3 @@ for(var i = 0; i < this.secondaries.length; i++) { | ||
var add = function(list, server) { | ||
var add = function(self, list, server) { | ||
// Check if the server is a secondary at the moment | ||
@@ -345,3 +419,6 @@ for(var i = 0; i < list.length; i++) { | ||
// Add serer to list | ||
list.push(server); | ||
// Return true | ||
return true; | ||
@@ -356,3 +433,3 @@ } | ||
State.prototype.addSecondary = function(server) { | ||
return add(this.secondaries, server); | ||
return add(this, this.secondaries, server); | ||
} | ||
@@ -366,3 +443,3 @@ | ||
State.prototype.addArbiter = function(server) { | ||
return add(this.arbiters, server); | ||
return add(this, this.arbiters, server); | ||
} | ||
@@ -376,3 +453,3 @@ | ||
State.prototype.addPassive = function(server) { | ||
return add(this.passives, server); | ||
return add(this, this.passives, server); | ||
} | ||
@@ -424,5 +501,6 @@ | ||
if(self.logger.isInfo()) self.logger.info(f('[%s] removing %s from set', self.id, ismaster.me)); | ||
self.replSet.emit('left', self.remove(server), server); | ||
self.replSet.emit('left', result, server); | ||
} | ||
emitTopologyDescriptionChanged(this); | ||
return false; | ||
@@ -442,2 +520,3 @@ } | ||
self.replSet.emit('error', new MongoError("provided setName for Replicaset Connection does not match setName found in server seedlist")); | ||
emitTopologyDescriptionChanged(this); | ||
return false; | ||
@@ -474,2 +553,5 @@ } | ||
// Emit the description change | ||
emitTopologyDescriptionChanged(this); | ||
// We are connected | ||
@@ -488,2 +570,3 @@ if(self.state == CONNECTING) { | ||
self.replSet.emit('joined', 'arbiter', server); | ||
emitTopologyDescriptionChanged(this); | ||
return true; | ||
@@ -506,2 +589,3 @@ }; | ||
emitTopologyDescriptionChanged(this); | ||
return true; | ||
@@ -517,2 +601,7 @@ }; | ||
// Is this the primary right now | ||
if(self.primary && self.primary.name == server.name) { | ||
self.primary = null; | ||
} | ||
if(self.secondaryOnlyConnectionAllowed && self.state == CONNECTING) { | ||
@@ -523,2 +612,3 @@ self.state = CONNECTED; | ||
emitTopologyDescriptionChanged(this); | ||
return true; | ||
@@ -525,0 +615,0 @@ }; |
@@ -146,2 +146,4 @@ "use strict"; | ||
, haInterval: options.haInterval || 10000 | ||
// Current haInterval | ||
, currentHaInterval: options.haInterval || 10000 | ||
// Are we running in debug mode | ||
@@ -392,3 +394,8 @@ , debug: typeof options.debug == 'boolean' ? options.debug : false | ||
// We have a no master error, immediately refresh the view of the replicaset | ||
if(notMasterError(r) || notMasterError(err)) replicasetInquirer(self, self.s, self.s.highAvailabilityProcessRunning)(); | ||
if(notMasterError(r) || notMasterError(err)) { | ||
// Set he current interval to minHeartbeatFrequencyMS | ||
self.s.currentHaInterval = self.s.minHeartbeatFrequencyMS; | ||
// Attempt to locate the current master immediately | ||
replicasetInquirer(self, self.s, true)(); | ||
} | ||
// Return the result | ||
@@ -632,2 +639,12 @@ callback(err, r); | ||
/** | ||
* Emit event if it exists | ||
* @method | ||
*/ | ||
function emitSDAMEvent(self, event, description) { | ||
if(self.listeners(event).length > 0) { | ||
self.emit(event, description); | ||
} | ||
} | ||
/** | ||
* Initiate server connect | ||
@@ -649,2 +666,5 @@ * @method | ||
// Reset the replState | ||
this.s.replState.resetDescription(); | ||
// For all entries in the seedlist build a server instance | ||
@@ -663,2 +683,5 @@ this.s.seedlist.forEach(function(e) { | ||
opts.monitoring = true; | ||
opts.topologyId = self.s.id; | ||
// Server is in topology | ||
opts.inTopology = true; | ||
// Set up tags if any | ||
@@ -678,2 +701,5 @@ if(self.s.tag) opts.tag = self.s.tag; | ||
// Emit the topology opening event | ||
emitSDAMEvent(this, 'topologyOpening', { topologyId: this.s.id }); | ||
// Attempt to connect to all the servers | ||
@@ -692,2 +718,10 @@ while(this.s.disconnectedServers.length > 0) { | ||
// SDAM Monitoring events | ||
server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); | ||
server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); | ||
server.on('serverHeartbeatStarted', function(e) { self.emit('serverHeartbeatStarted', e); }); | ||
server.on('serverHeartbeatSucceeded', function(e) { self.emit('serverHeartbeatSucceeded', e); }); | ||
server.on('serverHearbeatFailed', function(e) { self.emit('serverHearbeatFailed', e); }); | ||
server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); | ||
// Ensure we schedule the opening of new socket | ||
@@ -722,3 +756,3 @@ // on separate ticks of the event loop | ||
&& options.readPreference.equals(ReadPreference.primary)) { | ||
return this.s.replState.isSecondaryConnected() || this.s.replState.isPrimaryConnected(); | ||
return this.s.replState.isPrimaryConnected(); | ||
} | ||
@@ -767,4 +801,14 @@ | ||
server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); | ||
server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); | ||
server.on('serverHeartbeatStarted', function(e) { self.emit('serverHeartbeatStarted', e); }); | ||
server.on('serverHeartbeatSucceeded', function(e) { self.emit('serverHeartbeatSucceeded', e); }); | ||
server.on('serverHearbeatFailed', function(e) { self.emit('serverHearbeatFailed', e); }); | ||
server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); | ||
// Clear out any listeners | ||
var events = ['timeout', 'error', 'close', 'joined', 'left']; | ||
var events = ['timeout', 'error', 'close', 'joined', 'left', | ||
'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', 'serverHearbeatFailed', 'serverClosed']; | ||
events.forEach(function(e) { | ||
@@ -792,4 +836,9 @@ self.removeAllListeners(e); | ||
// Emit toplogy closing event | ||
emitSDAMEvent(this, 'topologyClosed', { topologyId: this.s.id }); | ||
// Clear out any listeners | ||
var events = ['timeout', 'error', 'close', 'joined', 'left']; | ||
var events = ['timeout', 'error', 'close', 'joined', 'left', | ||
'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', 'serverHearbeatFailed', 'serverClosed']; | ||
events.forEach(function(e) { | ||
@@ -1011,4 +1060,5 @@ self.removeAllListeners(e); | ||
var setHaTimer = function(self, state) { | ||
if(state.highAvailabilityProcessRunning) return; | ||
// all haTimers are set to to repeat, so we pass norepeat false | ||
self.s.haTimer = setTimeout(replicasetInquirer(self, state, false), state.haInterval); | ||
self.s.haTimer = setTimeout(replicasetInquirer(self, state, false), state.currentHaInterval); | ||
return self.s.haTimer; | ||
@@ -1037,2 +1087,9 @@ } | ||
// Do we have a primary, ensure we only monitor by the haInterval | ||
if(state.replState.isPrimaryConnected()) { | ||
self.s.currentHaInterval = self.s.haInterval; | ||
} else { | ||
self.s.currentHaInterval = self.s.minHeartbeatFrequencyMS; | ||
} | ||
// Started processes | ||
@@ -1055,2 +1112,5 @@ state.highAvailabilityProcessRunning = true; | ||
opts.monitoring = true; | ||
opts.topologyId = self.s.id; | ||
// Server is in topology | ||
opts.inTopology = true; | ||
// Set up tags if any | ||
@@ -1101,2 +1161,10 @@ if(state.tag) opts.tag = stage.tag; | ||
// SDAM Monitoring events | ||
server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); | ||
server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); | ||
server.on('serverHeartbeatStarted', function(e) { self.emit('serverHeartbeatStarted', e); }); | ||
server.on('serverHeartbeatSucceeded', function(e) { self.emit('serverHeartbeatSucceeded', e); }); | ||
server.on('serverHearbeatFailed', function(e) { self.emit('serverHearbeatFailed', e); }); | ||
server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); | ||
// Ensure we schedule the opening of new socket | ||
@@ -1164,4 +1232,3 @@ // on separate ticks of the event loop | ||
state.highAvailabilityProcessRunning = false; | ||
// // Return the replicasetInquirer | ||
// if(!norepeat) setHaTimer(self, state); | ||
// Return the replicasetInquirer | ||
return callback(); | ||
@@ -1177,2 +1244,6 @@ } | ||
// Update server instance ismaster to ensure proper sync | ||
// when producing SDAM monitoring events | ||
server.s.ismaster = ismaster; | ||
// Update the replicaset state | ||
@@ -1295,3 +1366,5 @@ state.replState.update(ismaster, server); | ||
// Remove any non used handlers | ||
['error', 'close', 'timeout', 'connect'].forEach(function(e) { | ||
['error', 'close', 'timeout', 'connect', | ||
'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', 'serverHearbeatFailed', 'serverClosed'].forEach(function(e) { | ||
server.removeAllListeners(e); | ||
@@ -1358,3 +1431,5 @@ }) | ||
// Deal with events | ||
var events = ['error', 'close', 'timeout', 'connect', 'message']; | ||
var events = ['error', 'close', 'timeout', 'connect', 'message', | ||
'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', 'serverHearbeatFailed', 'serverClosed']; | ||
// Remove any non used handlers | ||
@@ -1385,2 +1460,10 @@ events.forEach(function(e) { | ||
server.on('timeout', timeoutHandler(self, state)); | ||
// SDAM Monitoring events | ||
server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); | ||
server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); | ||
server.on('serverHeartbeatStarted', function(e) { self.emit('serverHeartbeatStarted', e); }); | ||
server.on('serverHeartbeatSucceeded', function(e) { self.emit('serverHeartbeatSucceeded', e); }); | ||
server.on('serverHearbeatFailed', function(e) { self.emit('serverHearbeatFailed', e); }); | ||
server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); | ||
} | ||
@@ -1487,5 +1570,8 @@ | ||
opts.emitError = true; | ||
// Server is in topology | ||
opts.inTopology = true; | ||
// Set the size to size + 1 and mark monitoring | ||
opts.size = opts.size + 1; | ||
opts.monitoring = true; | ||
opts.topologyId = self.s.id; | ||
@@ -1507,2 +1593,10 @@ // Do we have an arbiter set the poolSize to 1 | ||
// SDAM Monitoring events | ||
server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); | ||
server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); | ||
server.on('serverHeartbeatStarted', function(e) { self.emit('serverHeartbeatStarted', e); }); | ||
server.on('serverHeartbeatSucceeded', function(e) { self.emit('serverHeartbeatSucceeded', e); }); | ||
server.on('serverHearbeatFailed', function(e) { self.emit('serverHearbeatFailed', e); }); | ||
server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); | ||
// Attempt to connect | ||
@@ -1522,3 +1616,5 @@ process.nextTick(function() { | ||
// Remove any non used handlers | ||
['error', 'close', 'timeout', 'connect'].forEach(function(e) { | ||
['error', 'close', 'timeout', 'connect', | ||
'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', 'serverHearbeatFailed', 'serverClosed'].forEach(function(e) { | ||
server.removeAllListeners(e); | ||
@@ -1525,0 +1621,0 @@ }) |
@@ -213,3 +213,5 @@ "use strict"; | ||
// Remove any non used handlers | ||
var events = ['error', 'close', 'timeout', 'parseError']; | ||
var events = ['error', 'close', 'timeout', 'parseError', | ||
'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted', | ||
'serverHeartbeatSucceeded', 'serverHearbeatFailed', 'serverClosed']; | ||
events.forEach(function(e) { | ||
@@ -409,3 +411,14 @@ state.pool.removeAllListeners(e); | ||
if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s sockets closed", self.name))); | ||
// Emit error event | ||
// Emit opening server event | ||
if(self.listeners('serverClosed').length > 0) self.emit('serverClosed', { | ||
topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.s.id, address: self.name | ||
}); | ||
// Emit toplogy opening event if not in topology | ||
if(self.listeners('topologyClosed').length > 0 && !self.s.inTopology) { | ||
self.emit('topologyClosed', { topologyId: self.s.id }); | ||
} | ||
// Emit close event | ||
self.emit('close', err, self); | ||
@@ -459,5 +472,26 @@ // If we specified the driver to reconnect perform it | ||
state.state = DISCONNECTED; | ||
// Emit opening server event | ||
if(self.listeners('serverClosed').length > 0) self.emit('serverClosed', { | ||
topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.s.id, address: self.name | ||
}); | ||
// Emit toplogy opening event if not in topology | ||
if(!self.s.inTopology) { | ||
self.emit('topologyOpening', { topologyId: self.s.id }); | ||
} | ||
return self.emit('close', err, self); | ||
} | ||
// Emit server description changed if something listening | ||
emitServerDescriptionChanged(self, { | ||
address: self.name, arbiters: [], hosts: [], passives: [], type: !self.s.inTopology ? 'Standalone' : getTopologyType(self) | ||
}); | ||
// Emit topology description changed if something listening | ||
emitTopologyDescriptionChanged(self, { | ||
topologyType: 'Single', servers: [{address: self.name, arbiters: [], hosts: [], passives: [], type: 'Standalone'}] | ||
}); | ||
// Set the latency for this instance | ||
@@ -625,2 +659,4 @@ state.isMasterLatencyMS = new Date().getTime() - start; | ||
, id: serverId++ | ||
// Shared topology id if part of another one | ||
, topologyId: options.topologyId || -1 | ||
// Grouping tag used for debugging purposes | ||
@@ -646,2 +682,4 @@ , tag: options.tag | ||
, isMasterLatencyMS: 0 | ||
// Is the server in a topology | ||
, inTopology: typeof options.inTopology == 'boolean' ? options.inTopology : false | ||
// Server details | ||
@@ -653,2 +691,6 @@ , serverDetails: { | ||
} | ||
// Current server description | ||
, serverDescription: null | ||
// Current topology description | ||
, topologyDescription: null | ||
} | ||
@@ -669,6 +711,2 @@ | ||
// // If we are monitoring this server we will create an exclusive reserved socket for that | ||
// this.monitoring = typeof options.monitoring == 'boolean' ? options.monitoring : false; | ||
// this.haInterval = options.haInterval || 10000; | ||
// Set error properties | ||
@@ -693,3 +731,83 @@ getProperty(this, 'name', 'name', s.serverDetails, {}); | ||
var getPreviousDescription = function(self) { | ||
if(!self.s.serverDescription) { | ||
self.s.serverDescription = { | ||
address: self.name, | ||
arbiters: [], hosts: [], passives: [], type: 'Unknown' | ||
} | ||
} | ||
return self.s.serverDescription; | ||
} | ||
var emitServerDescriptionChanged = function(self, description) { | ||
if(self.listeners('serverDescriptionChanged').length > 0) { | ||
// Emit the server description changed events | ||
self.emit('serverDescriptionChanged', { | ||
topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.s.id, address: self.name, | ||
previousDescription: getPreviousDescription(self), | ||
newDescription: description | ||
}); | ||
self.s.serverDescription = description; | ||
} | ||
} | ||
var getPreviousTopologyDescription = function(self) { | ||
if(!self.s.topologyDescription) { | ||
self.s.topologyDescription = { | ||
topologyType: 'Unknown', | ||
servers: [{ | ||
address: self.name, arbiters: [], hosts: [], passives: [], type: 'Unknown' | ||
}] | ||
} | ||
} | ||
return self.s.topologyDescription; | ||
} | ||
var emitTopologyDescriptionChanged = function(self, description) { | ||
if(self.listeners('topologyDescriptionChanged').length > 0) { | ||
// Emit the server description changed events | ||
self.emit('topologyDescriptionChanged', { | ||
topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.s.id, address: self.name, | ||
previousDescription: getPreviousTopologyDescription(self), | ||
newDescription: description | ||
}); | ||
self.s.serverDescription = description; | ||
} | ||
} | ||
/** | ||
* Emit event if it exists | ||
* @method | ||
*/ | ||
function emitSDAMEvent(self, event, description) { | ||
if(self.listeners(event).length > 0) { | ||
self.emit(event, description); | ||
} | ||
} | ||
/** | ||
* Get the server description | ||
* @method | ||
* @return {object} | ||
*/ | ||
Server.prototype.getDescription = function() { | ||
var ismaster = this.s.ismaster || {}; | ||
var description = { | ||
type: getTopologyType(this), | ||
address: this.name, | ||
}; | ||
// Add fields if available | ||
if(ismaster.hosts) description.hosts = ismaster.hosts; | ||
if(ismaster.arbiters) description.arbiters = ismaster.arbiters; | ||
if(ismaster.passives) description.passives = ismaster.passives; | ||
if(ismaster.setName) description.setName = ismaster.setName; | ||
return description; | ||
} | ||
/** | ||
* Execute a command | ||
@@ -732,2 +850,11 @@ * @method | ||
/** | ||
* Returns the last known ismaster response latency | ||
* @method | ||
* @return {object} | ||
*/ | ||
Server.prototype.isMasterLatencyMS = function() { | ||
return this.s.isMasterLatencyMS; | ||
} | ||
/** | ||
* Initiate server connect | ||
@@ -766,2 +893,12 @@ * @method | ||
// Emit toplogy opening event if not in topology | ||
if(!self.s.inTopology) { | ||
this.emit('topologyOpening', { topologyId: this.s.id }); | ||
} | ||
// Emit opening server event | ||
self.emit('serverOpening', { | ||
topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.s.id, address: self.name | ||
}); | ||
// | ||
@@ -799,9 +936,58 @@ // Handle new connections | ||
var getTopologyType = function(self, ismaster) { | ||
if(!ismaster) { | ||
ismaster = self.s.ismaster; | ||
} | ||
if(!ismaster) return 'Unknown'; | ||
if(ismaster.ismaster && !ismaster.hosts) return 'Standalone'; | ||
if(ismaster.ismaster) return 'RSPrimary'; | ||
if(ismaster.secondary) return 'RSSecondary'; | ||
if(ismaster.arbiterOnly) return 'RSArbiter'; | ||
return 'Unknown'; | ||
} | ||
var changedIsMaster = function(self, currentIsmaster, ismaster) { | ||
var currentType = getTopologyType(self, currentIsmaster); | ||
var newType = getTopologyType(self, ismaster); | ||
if(newType != currentType) return true; | ||
return false; | ||
} | ||
var inquireServerState = function(self) { | ||
return function() { | ||
if(self.s.state == DESTROYED) return; | ||
// Record response time | ||
var start = new Date().getTime(); | ||
// emitSDAMEvent | ||
emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: self.name }); | ||
// Attempt to execute ismaster command | ||
self.command('admin.$cmd', { ismaster:true }, { monitoring:true }, function(err, r) { | ||
if(!err) { | ||
// Legacy event sender | ||
self.emit('ismaster', r, self); | ||
// Calculate latencyMS | ||
var latencyMS = new Date().getTime() - start; | ||
// Server heart beat event | ||
emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: self.name }); | ||
// Did the server change | ||
if(changedIsMaster(self, self.s.ismaster, r.result)) { | ||
// Emit server description changed if something listening | ||
emitServerDescriptionChanged(self, { | ||
address: self.name, arbiters: [], hosts: [], passives: [], type: !self.s.inTopology ? 'Standalone' : getTopologyType(self) | ||
}); | ||
} | ||
// Updat ismaster view | ||
self.s.ismaster = r.result; | ||
// Set server response time | ||
self.s.isMasterLatencyMS = latencyMS; | ||
} else { | ||
emitSDAMEvent(self, 'serverHearbeatFailed', { durationMS: latencyMS, failure: err, connectionId: self.name }); | ||
} | ||
@@ -836,2 +1022,12 @@ | ||
// Emit opening server event | ||
if(self.listeners('serverClosed').length > 0) self.emit('serverClosed', { | ||
topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.s.id, address: self.name | ||
}); | ||
// Emit toplogy opening event if not in topology | ||
if(self.listeners('topologyClosed').length > 0 && !self.s.inTopology) { | ||
self.emit('topologyClosed', { topologyId: self.s.id }); | ||
} | ||
// Emit destroy event | ||
@@ -838,0 +1034,0 @@ if(emitDestroy) self.emit('destroy', self); |
{ | ||
"name": "mongodb-core", | ||
"version": "1.3.7", | ||
"version": "1.3.8", | ||
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
484752
9894