mongodb-core
Advanced tools
Comparing version 2.1.10 to 2.1.11
@@ -0,1 +1,8 @@ | ||
2.1.11 2017-05-22 | ||
----------------- | ||
* NODE-987 Clear out old intervalIds on when calling topologyMonitor. | ||
* NODE-987 Moved filtering to pingServer method and added test case. | ||
* Check for connection destroyed just before writing out and flush out operations correctly if it is (Issue #179, https://github.com/jmholzinger). | ||
* NODE-989 Refactored Replicaset monitoring to correcly monitor newly added servers, Also extracted setTimeout and setInterval to use custom wrappers Timeout and Interva. | ||
2.1.10 2017-04-18 | ||
@@ -2,0 +9,0 @@ ----------------- |
@@ -520,6 +520,17 @@ "use strict"; | ||
// Write out the command | ||
if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary'); | ||
// Iterate over all buffers and write them in order to the socket | ||
for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary'); | ||
// Double check that the connection is not destroyed | ||
if(this.connection.destroyed === false) { | ||
// Write out the command | ||
if(!Array.isArray(buffer)) { | ||
this.connection.write(buffer, 'binary'); | ||
return true; | ||
} | ||
// Iterate over all buffers and write them in order to the socket | ||
for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary'); | ||
return true; | ||
} | ||
// Connection is destroyed return write failed | ||
return false; | ||
} | ||
@@ -526,0 +537,0 @@ |
@@ -93,2 +93,5 @@ "use strict"; | ||
// console.log("=================================== pool options") | ||
// console.dir(this.options) | ||
// Identification information | ||
@@ -236,2 +239,5 @@ this.id = _id++; | ||
return function(err) { | ||
// console.log("========== connectionFailureHandler :: " + event) | ||
// console.dir(err) | ||
if (this._connectionFailHandled) return; | ||
@@ -288,2 +294,3 @@ this._connectionFailHandled = true; | ||
return function() { | ||
// console.log("========================= attemptReconnect") | ||
self.emit('attemptReconnect', self); | ||
@@ -299,4 +306,5 @@ if(self.state == DESTROYED || self.state == DESTROYING) return; | ||
// If we have failure schedule a retry | ||
function _connectionFailureHandler(self) { | ||
function _connectionFailureHandler(self, event) { | ||
return function() { | ||
// console.log("========== _connectionFailureHandler :: " + event) | ||
if (this._connectionFailHandled) return; | ||
@@ -1275,13 +1283,23 @@ this._connectionFailHandled = true; | ||
// Capture if write was successful | ||
var writeSuccessful = true; | ||
// Put operation on the wire | ||
if(Array.isArray(buffer)) { | ||
for(var i = 0; i < buffer.length; i++) { | ||
connection.write(buffer[i]) | ||
writeSuccessful = connection.write(buffer[i]) | ||
} | ||
} else { | ||
connection.write(buffer); | ||
writeSuccessful = connection.write(buffer); | ||
} | ||
if(workItem.immediateRelease && self.authenticating) { | ||
if(writeSuccessful && workItem.immediateRelease && self.authenticating) { | ||
self.nonAuthenticatedConnections.push(connection); | ||
} else if(writeSuccessful === false) { | ||
// If write not successful put back on queue | ||
self.queue.unshift(workItem); | ||
// Remove the disconnected connection | ||
removeConnection(self, connection); | ||
// Flush any monitoring operations in the queue, failing fast | ||
flushMonitoringOperations(self.queue); | ||
} | ||
@@ -1288,0 +1306,0 @@ } else { |
@@ -506,2 +506,3 @@ "use strict" | ||
// Emit secondary joined replicaset | ||
self.emit('joined', 'secondary', server); | ||
@@ -508,0 +509,0 @@ emitTopologyDescriptionChanged(self); |
@@ -15,2 +15,4 @@ "use strict" | ||
clone = require('./shared').clone, | ||
Timeout = require('./shared').Timeout, | ||
Interval = require('./shared').Interval, | ||
createClientInfo = require('./shared').createClientInfo; | ||
@@ -292,2 +294,5 @@ | ||
// Enalbe the monitoring of the new server | ||
monitorServer(_self.lastIsMaster().me, self, {}); | ||
// Rexecute any stalled operation | ||
@@ -433,97 +438,118 @@ rexecuteOperations(self); | ||
function topologyMonitor(self, options) { | ||
if(self.state == DESTROYED || self.state == UNREFERENCED) return; | ||
options = options || {}; | ||
// Each server is monitored in parallel in their own timeout loop | ||
var monitorServer = function(host, self, options) { | ||
// If this is not the initial scan | ||
// Is this server already being monitoried, then skip monitoring | ||
if(!options.haInterval) { | ||
for(var i = 0; i < self.intervalIds.length; i++) { | ||
if(self.intervalIds[i].__host === host) { | ||
return; | ||
} | ||
} | ||
} | ||
var servers = Object.keys(self.s.replicaSetState.set); | ||
// Get the haInterval | ||
var _process = options.haInterval ? setTimeout : setInterval; | ||
var _process = options.haInterval ? Timeout : Interval; | ||
var _haInterval = options.haInterval ? options.haInterval : self.s.haInterval; | ||
// Count of initial sweep servers to check | ||
var count = servers.length; | ||
// Create the interval | ||
var intervalId = new _process(function() { | ||
if(self.state == DESTROYED || self.state == UNREFERENCED) { | ||
// clearInterval(intervalId); | ||
intervalId.stop(); | ||
return; | ||
} | ||
// Each server is monitored in parallel in their own timeout loop | ||
var monitorServer = function(_host, _self, _options) { | ||
var intervalId = _process(function() { | ||
if(self.state == DESTROYED || self.state == UNREFERENCED) { | ||
clearInterval(intervalId); | ||
return; | ||
} | ||
// Do we already have server connection available for this host | ||
var _server = self.s.replicaSetState.get(host); | ||
// Adjust the count | ||
count = count - 1; | ||
// Check if we have a known server connection and reuse | ||
if(_server) { | ||
// Ping the server | ||
return pingServer(self, _server, function(err) { | ||
if(self.state == DESTROYED || self.state == UNREFERENCED) { | ||
intervalId.stop(); | ||
return; | ||
} | ||
// Do we already have server connection available for this host | ||
var _server = _self.s.replicaSetState.get(_host); | ||
// Filter out all called intervaliIds | ||
self.intervalIds = self.intervalIds.filter(function(intervalId) { | ||
return intervalId.isRunning(); | ||
} ); | ||
// Check if we have a known server connection and reuse | ||
if(_server) { | ||
return pingServer(_self, _server, function(err) { | ||
if(self.state == DESTROYED || self.state == UNREFERENCED) { | ||
clearInterval(intervalId); | ||
return; | ||
// Initial sweep | ||
if(_process === Timeout) { | ||
if(self.state == CONNECTING && ( | ||
( | ||
self.s.replicaSetState.hasSecondary() | ||
&& self.s.options.secondaryOnlyConnectionAllowed | ||
) | ||
|| self.s.replicaSetState.hasPrimary() | ||
)) { | ||
self.state = CONNECTED; | ||
// Emit connected sign | ||
process.nextTick(function() { | ||
self.emit('connect', self); | ||
}); | ||
// Start topology interval check | ||
topologyMonitor(self, {}); | ||
} | ||
} else { | ||
if(self.state == DISCONNECTED && ( | ||
( | ||
self.s.replicaSetState.hasSecondary() | ||
&& self.s.options.secondaryOnlyConnectionAllowed | ||
) | ||
|| self.s.replicaSetState.hasPrimary() | ||
)) { | ||
self.state = CONNECTED; | ||
// Initial sweep | ||
if(_process === setTimeout) { | ||
if(_self.state == CONNECTING && ( | ||
( | ||
self.s.replicaSetState.hasSecondary() | ||
&& self.s.options.secondaryOnlyConnectionAllowed | ||
) | ||
|| self.s.replicaSetState.hasPrimary() | ||
)) { | ||
_self.state = CONNECTED; | ||
// Rexecute any stalled operation | ||
rexecuteOperations(self); | ||
// Emit connected sign | ||
process.nextTick(function() { | ||
self.emit('connect', self); | ||
}); | ||
// Emit connected sign | ||
process.nextTick(function() { | ||
self.emit('reconnect', self); | ||
}); | ||
} | ||
} | ||
// Start topology interval check | ||
topologyMonitor(_self, {}); | ||
} | ||
} else { | ||
if(_self.state == DISCONNECTED && ( | ||
( | ||
self.s.replicaSetState.hasSecondary() | ||
&& self.s.options.secondaryOnlyConnectionAllowed | ||
) | ||
|| self.s.replicaSetState.hasPrimary() | ||
)) { | ||
_self.state = CONNECTED; | ||
if(self.initialConnectState.connect | ||
&& !self.initialConnectState.fullsetup | ||
&& self.s.replicaSetState.hasPrimaryAndSecondary()) { | ||
// Set initial connect state | ||
self.initialConnectState.fullsetup = true; | ||
self.initialConnectState.all = true; | ||
// Rexecute any stalled operation | ||
rexecuteOperations(self); | ||
process.nextTick(function() { | ||
self.emit('fullsetup', self); | ||
self.emit('all', self); | ||
}); | ||
} | ||
}); | ||
} | ||
}, _haInterval); | ||
// Emit connected sign | ||
process.nextTick(function() { | ||
self.emit('reconnect', self); | ||
}); | ||
} | ||
} | ||
// Start the interval | ||
intervalId.start(); | ||
// Add the intervalId host name | ||
intervalId.__host = host; | ||
// Add the intervalId to our list of intervalIds | ||
self.intervalIds.push(intervalId); | ||
} | ||
if(self.initialConnectState.connect | ||
&& !self.initialConnectState.fullsetup | ||
&& self.s.replicaSetState.hasPrimaryAndSecondary()) { | ||
// Set initial connect state | ||
self.initialConnectState.fullsetup = true; | ||
self.initialConnectState.all = true; | ||
function topologyMonitor(self, options) { | ||
if(self.state == DESTROYED || self.state == UNREFERENCED) return; | ||
options = options || {}; | ||
process.nextTick(function() { | ||
self.emit('fullsetup', self); | ||
self.emit('all', self); | ||
}); | ||
} | ||
}); | ||
} | ||
}, _haInterval); | ||
// Get the servers | ||
var servers = Object.keys(self.s.replicaSetState.set); | ||
// Add the intervalId to our list of intervalIds | ||
self.intervalIds.push(intervalId); | ||
} | ||
// Get the haInterval | ||
var _process = options.haInterval ? Timeout : Interval; | ||
var _haInterval = options.haInterval ? options.haInterval : self.s.haInterval; | ||
if(_process === setTimeout) { | ||
if(_process === Timeout) { | ||
return connectNewServers(self, self.s.replicaSetState.unknownServers, function(err) { | ||
@@ -558,7 +584,7 @@ if(!self.s.replicaSetState.hasPrimary() && !self.s.options.secondaryOnlyConnectionAllowed) { | ||
connectNewServers(self, self.s.replicaSetState.unknownServers, function() { | ||
if(self.s.replicaSetState.hasPrimary()) { | ||
self.intervalIds.push(setTimeout(executeReconnect(self), _haInterval)); | ||
} else { | ||
self.intervalIds.push(setTimeout(executeReconnect(self), self.s.minHeartbeatFrequencyMS)); | ||
} | ||
var monitoringFrequencey = self.s.replicaSetState.hasPrimary() | ||
? _haInterval : self.s.minHeartbeatFrequencyMS; | ||
// Create a timeout | ||
self.intervalIds.push(new Timeout(executeReconnect(self), monitoringFrequencey).start()); | ||
}); | ||
@@ -573,3 +599,3 @@ } | ||
self.intervalIds.push(setTimeout(executeReconnect(self), intervalTime)); | ||
self.intervalIds.push(new Timeout(executeReconnect(self), intervalTime).start()); | ||
} | ||
@@ -855,4 +881,4 @@ | ||
for(var i = 0; i < this.intervalIds.length; i++) { | ||
clearInterval(this.intervalIds[i]); | ||
clearTimeout(this.intervalIds[i]); | ||
this.intervalIds[i].stop(); | ||
this.intervalIds[i].stop(); | ||
} | ||
@@ -859,0 +885,0 @@ |
@@ -218,2 +218,46 @@ "use strict" | ||
function Interval(fn, time) { | ||
var timer = false; | ||
this.start = function () { | ||
if (!this.isRunning()) { | ||
timer = setInterval(fn, time); | ||
} | ||
return this; | ||
}; | ||
this.stop = function () { | ||
clearInterval(timer); | ||
timer = false; | ||
return this; | ||
}; | ||
this.isRunning = function () { | ||
return timer !== false; | ||
}; | ||
} | ||
function Timeout(fn, time) { | ||
var timer = false; | ||
this.start = function () { | ||
if (!this.isRunning()) { | ||
timer = setTimeout(fn, time); | ||
} | ||
return this; | ||
}; | ||
this.stop = function () { | ||
clearTimeout(timer); | ||
timer = false; | ||
return this; | ||
}; | ||
this.isRunning = function () { | ||
if(timer && timer._called) return false; | ||
return timer !== false; | ||
}; | ||
} | ||
module.exports.inquireServerState = inquireServerState | ||
@@ -227,1 +271,3 @@ module.exports.getTopologyType = getTopologyType; | ||
module.exports.clone = clone; | ||
module.exports.Interval = Interval; | ||
module.exports.Timeout = Timeout; |
{ | ||
"name": "mongodb-core", | ||
"version": "2.1.10", | ||
"version": "2.1.11", | ||
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
603768
10501