Socket
Socket
Sign inDemoInstall

mongodb-core

Package Overview
Dependencies
Maintainers
1
Versions
177
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongodb-core - npm Package Compare versions

Comparing version 2.1.10 to 2.1.11

7

HISTORY.md

@@ -0,1 +1,8 @@

2.1.11 2017-05-22
-----------------
* NODE-987 Clear out old intervalIds on when calling topologyMonitor.
* NODE-987 Moved filtering to pingServer method and added test case.
* Check for connection destroyed just before writing out and flush out operations correctly if it is (Issue #179, https://github.com/jmholzinger).
* NODE-989 Refactored Replicaset monitoring to correcly monitor newly added servers, Also extracted setTimeout and setInterval to use custom wrappers Timeout and Interva.
2.1.10 2017-04-18

@@ -2,0 +9,0 @@ -----------------

19

lib/connection/connection.js

@@ -520,6 +520,17 @@ "use strict";

// Write out the command
if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary');
// Iterate over all buffers and write them in order to the socket
for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
// Double check that the connection is not destroyed
if(this.connection.destroyed === false) {
// Write out the command
if(!Array.isArray(buffer)) {
this.connection.write(buffer, 'binary');
return true;
}
// Iterate over all buffers and write them in order to the socket
for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
return true;
}
// Connection is destroyed return write failed
return false;
}

@@ -526,0 +537,0 @@

@@ -93,2 +93,5 @@ "use strict";

// console.log("=================================== pool options")
// console.dir(this.options)
// Identification information

@@ -236,2 +239,5 @@ this.id = _id++;

return function(err) {
// console.log("========== connectionFailureHandler :: " + event)
// console.dir(err)
if (this._connectionFailHandled) return;

@@ -288,2 +294,3 @@ this._connectionFailHandled = true;

return function() {
// console.log("========================= attemptReconnect")
self.emit('attemptReconnect', self);

@@ -299,4 +306,5 @@ if(self.state == DESTROYED || self.state == DESTROYING) return;

// If we have failure schedule a retry
function _connectionFailureHandler(self) {
function _connectionFailureHandler(self, event) {
return function() {
// console.log("========== _connectionFailureHandler :: " + event)
if (this._connectionFailHandled) return;

@@ -1275,13 +1283,23 @@ this._connectionFailHandled = true;

// Capture if write was successful
var writeSuccessful = true;
// Put operation on the wire
if(Array.isArray(buffer)) {
for(var i = 0; i < buffer.length; i++) {
connection.write(buffer[i])
writeSuccessful = connection.write(buffer[i])
}
} else {
connection.write(buffer);
writeSuccessful = connection.write(buffer);
}
if(workItem.immediateRelease && self.authenticating) {
if(writeSuccessful && workItem.immediateRelease && self.authenticating) {
self.nonAuthenticatedConnections.push(connection);
} else if(writeSuccessful === false) {
// If write not successful put back on queue
self.queue.unshift(workItem);
// Remove the disconnected connection
removeConnection(self, connection);
// Flush any monitoring operations in the queue, failing fast
flushMonitoringOperations(self.queue);
}

@@ -1288,0 +1306,0 @@ } else {

@@ -506,2 +506,3 @@ "use strict"

// Emit secondary joined replicaset
self.emit('joined', 'secondary', server);

@@ -508,0 +509,0 @@ emitTopologyDescriptionChanged(self);

@@ -15,2 +15,4 @@ "use strict"

clone = require('./shared').clone,
Timeout = require('./shared').Timeout,
Interval = require('./shared').Interval,
createClientInfo = require('./shared').createClientInfo;

@@ -292,2 +294,5 @@

// Enalbe the monitoring of the new server
monitorServer(_self.lastIsMaster().me, self, {});
// Rexecute any stalled operation

@@ -433,97 +438,118 @@ rexecuteOperations(self);

function topologyMonitor(self, options) {
if(self.state == DESTROYED || self.state == UNREFERENCED) return;
options = options || {};
// Each server is monitored in parallel in their own timeout loop
var monitorServer = function(host, self, options) {
// If this is not the initial scan
// Is this server already being monitoried, then skip monitoring
if(!options.haInterval) {
for(var i = 0; i < self.intervalIds.length; i++) {
if(self.intervalIds[i].__host === host) {
return;
}
}
}
var servers = Object.keys(self.s.replicaSetState.set);
// Get the haInterval
var _process = options.haInterval ? setTimeout : setInterval;
var _process = options.haInterval ? Timeout : Interval;
var _haInterval = options.haInterval ? options.haInterval : self.s.haInterval;
// Count of initial sweep servers to check
var count = servers.length;
// Create the interval
var intervalId = new _process(function() {
if(self.state == DESTROYED || self.state == UNREFERENCED) {
// clearInterval(intervalId);
intervalId.stop();
return;
}
// Each server is monitored in parallel in their own timeout loop
var monitorServer = function(_host, _self, _options) {
var intervalId = _process(function() {
if(self.state == DESTROYED || self.state == UNREFERENCED) {
clearInterval(intervalId);
return;
}
// Do we already have server connection available for this host
var _server = self.s.replicaSetState.get(host);
// Adjust the count
count = count - 1;
// Check if we have a known server connection and reuse
if(_server) {
// Ping the server
return pingServer(self, _server, function(err) {
if(self.state == DESTROYED || self.state == UNREFERENCED) {
intervalId.stop();
return;
}
// Do we already have server connection available for this host
var _server = _self.s.replicaSetState.get(_host);
// Filter out all called intervaliIds
self.intervalIds = self.intervalIds.filter(function(intervalId) {
return intervalId.isRunning();
} );
// Check if we have a known server connection and reuse
if(_server) {
return pingServer(_self, _server, function(err) {
if(self.state == DESTROYED || self.state == UNREFERENCED) {
clearInterval(intervalId);
return;
// Initial sweep
if(_process === Timeout) {
if(self.state == CONNECTING && (
(
self.s.replicaSetState.hasSecondary()
&& self.s.options.secondaryOnlyConnectionAllowed
)
|| self.s.replicaSetState.hasPrimary()
)) {
self.state = CONNECTED;
// Emit connected sign
process.nextTick(function() {
self.emit('connect', self);
});
// Start topology interval check
topologyMonitor(self, {});
}
} else {
if(self.state == DISCONNECTED && (
(
self.s.replicaSetState.hasSecondary()
&& self.s.options.secondaryOnlyConnectionAllowed
)
|| self.s.replicaSetState.hasPrimary()
)) {
self.state = CONNECTED;
// Initial sweep
if(_process === setTimeout) {
if(_self.state == CONNECTING && (
(
self.s.replicaSetState.hasSecondary()
&& self.s.options.secondaryOnlyConnectionAllowed
)
|| self.s.replicaSetState.hasPrimary()
)) {
_self.state = CONNECTED;
// Rexecute any stalled operation
rexecuteOperations(self);
// Emit connected sign
process.nextTick(function() {
self.emit('connect', self);
});
// Emit connected sign
process.nextTick(function() {
self.emit('reconnect', self);
});
}
}
// Start topology interval check
topologyMonitor(_self, {});
}
} else {
if(_self.state == DISCONNECTED && (
(
self.s.replicaSetState.hasSecondary()
&& self.s.options.secondaryOnlyConnectionAllowed
)
|| self.s.replicaSetState.hasPrimary()
)) {
_self.state = CONNECTED;
if(self.initialConnectState.connect
&& !self.initialConnectState.fullsetup
&& self.s.replicaSetState.hasPrimaryAndSecondary()) {
// Set initial connect state
self.initialConnectState.fullsetup = true;
self.initialConnectState.all = true;
// Rexecute any stalled operation
rexecuteOperations(self);
process.nextTick(function() {
self.emit('fullsetup', self);
self.emit('all', self);
});
}
});
}
}, _haInterval);
// Emit connected sign
process.nextTick(function() {
self.emit('reconnect', self);
});
}
}
// Start the interval
intervalId.start();
// Add the intervalId host name
intervalId.__host = host;
// Add the intervalId to our list of intervalIds
self.intervalIds.push(intervalId);
}
if(self.initialConnectState.connect
&& !self.initialConnectState.fullsetup
&& self.s.replicaSetState.hasPrimaryAndSecondary()) {
// Set initial connect state
self.initialConnectState.fullsetup = true;
self.initialConnectState.all = true;
function topologyMonitor(self, options) {
if(self.state == DESTROYED || self.state == UNREFERENCED) return;
options = options || {};
process.nextTick(function() {
self.emit('fullsetup', self);
self.emit('all', self);
});
}
});
}
}, _haInterval);
// Get the servers
var servers = Object.keys(self.s.replicaSetState.set);
// Add the intervalId to our list of intervalIds
self.intervalIds.push(intervalId);
}
// Get the haInterval
var _process = options.haInterval ? Timeout : Interval;
var _haInterval = options.haInterval ? options.haInterval : self.s.haInterval;
if(_process === setTimeout) {
if(_process === Timeout) {
return connectNewServers(self, self.s.replicaSetState.unknownServers, function(err) {

@@ -558,7 +584,7 @@ if(!self.s.replicaSetState.hasPrimary() && !self.s.options.secondaryOnlyConnectionAllowed) {

connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
if(self.s.replicaSetState.hasPrimary()) {
self.intervalIds.push(setTimeout(executeReconnect(self), _haInterval));
} else {
self.intervalIds.push(setTimeout(executeReconnect(self), self.s.minHeartbeatFrequencyMS));
}
var monitoringFrequencey = self.s.replicaSetState.hasPrimary()
? _haInterval : self.s.minHeartbeatFrequencyMS;
// Create a timeout
self.intervalIds.push(new Timeout(executeReconnect(self), monitoringFrequencey).start());
});

@@ -573,3 +599,3 @@ }

self.intervalIds.push(setTimeout(executeReconnect(self), intervalTime));
self.intervalIds.push(new Timeout(executeReconnect(self), intervalTime).start());
}

@@ -855,4 +881,4 @@

for(var i = 0; i < this.intervalIds.length; i++) {
clearInterval(this.intervalIds[i]);
clearTimeout(this.intervalIds[i]);
this.intervalIds[i].stop();
this.intervalIds[i].stop();
}

@@ -859,0 +885,0 @@

@@ -218,2 +218,46 @@ "use strict"

function Interval(fn, time) {
var timer = false;
this.start = function () {
if (!this.isRunning()) {
timer = setInterval(fn, time);
}
return this;
};
this.stop = function () {
clearInterval(timer);
timer = false;
return this;
};
this.isRunning = function () {
return timer !== false;
};
}
function Timeout(fn, time) {
var timer = false;
this.start = function () {
if (!this.isRunning()) {
timer = setTimeout(fn, time);
}
return this;
};
this.stop = function () {
clearTimeout(timer);
timer = false;
return this;
};
this.isRunning = function () {
if(timer && timer._called) return false;
return timer !== false;
};
}
module.exports.inquireServerState = inquireServerState

@@ -227,1 +271,3 @@ module.exports.getTopologyType = getTopologyType;

module.exports.clone = clone;
module.exports.Interval = Interval;
module.exports.Timeout = Timeout;
{
"name": "mongodb-core",
"version": "2.1.10",
"version": "2.1.11",
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc