Socket
Socket
Sign inDemoInstall

mongodb

Package Overview
Dependencies
Maintainers
8
Versions
551
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongodb - npm Package Compare versions

Comparing version 6.5.0-dev.20240403.sha.cb5903f to 6.5.0-dev.20240404.sha.0e3d6ea

179

lib/sdam/monitor.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.MonitorInterval = exports.RTTPinger = exports.Monitor = exports.ServerMonitoringMode = void 0;
exports.RTTSampler = exports.MonitorInterval = exports.RTTPinger = exports.Monitor = exports.ServerMonitoringMode = void 0;
const timers_1 = require("timers");

@@ -22,4 +22,2 @@ const bson_1 = require("../bson");

const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');
const STATE_IDLE = 'idle';

@@ -66,2 +64,3 @@ const STATE_MONITORING = 'monitoring';

this.mongoLogger = this[kServer].topology.client?.mongoLogger;
this.rttSampler = new RTTSampler(10);
const cancellationToken = this[kCancellationToken];

@@ -135,2 +134,17 @@ // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration

}
get roundTripTime() {
return this.rttSampler.average();
}
get minRoundTripTime() {
return this.rttSampler.min();
}
get latestRtt() {
return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable
}
addRttSample(rtt) {
this.rttSampler.addSample(rtt);
}
clearRttSamples() {
this.rttSampler.clear();
}
}

@@ -146,2 +160,3 @@ exports.Monitor = Monitor;

monitor.connection = null;
monitor.clearRttSamples();
}

@@ -190,5 +205,8 @@ function useStreamingProtocol(monitor, topologyVersion) {

}
// NOTE: here we use the latestRtt as this measurement corresponds with the value
// obtained for this successful heartbeat
const duration = isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
? monitor.rttPinger.latestRtt ?? (0, utils_1.calculateDurationInMs)(start)
: (0, utils_1.calculateDurationInMs)(start);
monitor.addRttSample(duration);
monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, monitor[kServer].topology.s.id, hello.connectionId, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable));

@@ -227,3 +245,3 @@ if (isAwaitable) {

if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(monitor[kCancellationToken], Object.assign({ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, monitor.connectOptions));
monitor.rttPinger = new RTTPinger(monitor);
}

@@ -267,4 +285,6 @@ // Record new start time before sending handshake

}
const duration = (0, utils_1.calculateDurationInMs)(start);
monitor.addRttSample(duration);
monitor.connection = connection;
monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, monitor[kServer].topology.s.id, connection.hello?.connectionId, new events_1.ServerHeartbeatSucceededEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), connection.hello, useStreamingProtocol(monitor, connection.hello?.topologyVersion)));
monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, monitor[kServer].topology.s.id, connection.hello?.connectionId, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, connection.hello, useStreamingProtocol(monitor, connection.hello?.topologyVersion)));
callback(undefined, connection.hello);

@@ -319,13 +339,17 @@ }, error => {

class RTTPinger {
constructor(cancellationToken, options) {
constructor(monitor) {
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this[kCancellationToken] = monitor[kCancellationToken];
this.closed = false;
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
this[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
this.monitor = monitor;
this.latestRtt = monitor.latestRtt;
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
this[kMonitorId] = (0, timers_1.setTimeout)(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
}
get roundTripTime() {
return this[kRoundTripTime];
return this.monitor.roundTripTime;
}
get minRoundTripTime() {
return this.monitor.minRoundTripTime;
}
close() {

@@ -337,42 +361,41 @@ this.closed = true;

}
}
exports.RTTPinger = RTTPinger;
function measureRoundTripTime(rttPinger, options) {
const start = (0, utils_1.now)();
options.cancellationToken = rttPinger[kCancellationToken];
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
if (rttPinger.closed) {
return;
}
function measureAndReschedule(conn) {
if (rttPinger.closed) {
measureAndReschedule(start, conn) {
if (start == null) {
start = (0, utils_1.now)();
}
if (this.closed) {
conn?.destroy();
return;
}
if (rttPinger.connection == null) {
rttPinger.connection = conn;
if (this.connection == null) {
this.connection = conn;
}
rttPinger[kRoundTripTime] = (0, utils_1.calculateDurationInMs)(start);
rttPinger[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS);
this.latestRtt = (0, utils_1.calculateDurationInMs)(start);
this[kMonitorId] = (0, timers_1.setTimeout)(() => this.measureRoundTripTime(), this.monitor.options.heartbeatFrequencyMS);
}
const connection = rttPinger.connection;
if (connection == null) {
measureRoundTripTime() {
const start = (0, utils_1.now)();
if (this.closed) {
return;
}
const connection = this.connection;
if (connection == null) {
// eslint-disable-next-line github/no-then
(0, connect_1.connect)(this.monitor.connectOptions).then(connection => {
this.measureAndReschedule(start, connection);
}, () => {
this.connection = undefined;
});
return;
}
const commandName = connection.serverApi?.version || connection.helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
(0, connect_1.connect)(options).then(connection => {
measureAndReschedule(connection);
}, () => {
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
connection.command((0, utils_1.ns)('admin.$cmd'), { [commandName]: 1 }, undefined).then(() => this.measureAndReschedule(), () => {
this.connection?.destroy();
this.connection = undefined;
return;
});
return;
}
const commandName = connection.serverApi?.version || connection.helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connection.command((0, utils_1.ns)('admin.$cmd'), { [commandName]: 1 }, undefined).then(() => measureAndReschedule(), () => {
rttPinger.connection?.destroy();
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
});
}
exports.RTTPinger = RTTPinger;
/**

@@ -471,2 +494,72 @@ * @internal

exports.MonitorInterval = MonitorInterval;
/** @internal
* This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations)
*
* This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping
* the most recent `windowSize` samples
* */
class RTTSampler {
constructor(windowSize = 10) {
this.rttSamples = new Float64Array(windowSize);
this.length = 0;
this.writeIndex = 0;
}
/**
* Adds an rtt sample to the end of the circular buffer
* When `windowSize` samples have been collected, `addSample` overwrites the least recently added
* sample
*/
addSample(sample) {
this.rttSamples[this.writeIndex++] = sample;
if (this.length < this.rttSamples.length) {
this.length++;
}
this.writeIndex %= this.rttSamples.length;
}
/**
* When \< 2 samples have been collected, returns 0
* Otherwise computes the minimum value samples contained in the buffer
*/
min() {
if (this.length < 2)
return 0;
let min = this.rttSamples[0];
for (let i = 1; i < this.length; i++) {
if (this.rttSamples[i] < min)
min = this.rttSamples[i];
}
return min;
}
/**
* Returns mean of samples contained in the buffer
*/
average() {
if (this.length === 0)
return 0;
let sum = 0;
for (let i = 0; i < this.length; i++) {
sum += this.rttSamples[i];
}
return sum / this.length;
}
/**
* Returns most recently inserted element in the buffer
* Returns null if the buffer is empty
* */
get last() {
if (this.length === 0)
return null;
return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1];
}
/**
* Clear the buffer
* NOTE: this does not overwrite the data held in the internal array, just the pointers into
* this array
*/
clear() {
this.length = 0;
this.writeIndex = 0;
}
}
exports.RTTSampler = RTTSampler;
//# sourceMappingURL=monitor.js.map

@@ -51,2 +51,3 @@ "use strict";

this.roundTripTime = options?.roundTripTime ?? -1;
this.minRoundTripTime = options?.minRoundTripTime ?? 0;
this.lastUpdateTime = (0, utils_1.now)();

@@ -53,0 +54,0 @@ this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0;

@@ -159,3 +159,3 @@ "use strict";

function latencyWindowReducer(topologyDescription, servers) {
const low = servers.reduce((min, server) => min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min), -1);
const low = servers.reduce((min, server) => Math.min(server.roundTripTime, min), Infinity);
const high = low + topologyDescription.localThresholdMS;

@@ -162,0 +162,0 @@ return servers.reduce((result, server) => {

@@ -57,3 +57,4 @@ "use strict";

this.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(this.description.hostAddress, event.reply, {
roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
roundTripTime: this.monitor?.roundTripTime,
minRoundTripTime: this.monitor?.minRoundTripTime
}));

@@ -319,9 +320,2 @@ if (this.s.state === common_1.STATE_CONNECTING) {

exports.Server = Server;
function calculateRoundTripTime(oldRtt, duration) {
if (oldRtt === -1) {
return duration;
}
const alpha = 0.2;
return alpha * duration + (1 - alpha) * oldRtt;
}
function markServerUnknown(server, error) {

@@ -328,0 +322,0 @@ // Load balancer servers can never be marked unknown.

{
"name": "mongodb",
"version": "6.5.0-dev.20240403.sha.cb5903f",
"version": "6.5.0-dev.20240404.sha.0e3d6ea",
"description": "The official MongoDB driver for Node.js",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -513,2 +513,3 @@ import { Admin } from './admin';

RTTPingerOptions,
RTTSampler,
ServerMonitoringMode

@@ -515,0 +516,0 @@ } from './sdam/monitor';

@@ -11,4 +11,10 @@ import { clearTimeout, setTimeout } from 'timers';

import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Callback, EventEmitterWithState } from '../utils';
import { calculateDurationInMs, makeStateMachine, now, ns } from '../utils';
import {
calculateDurationInMs,
type Callback,
type EventEmitterWithState,
makeStateMachine,
now,
ns
} from '../utils';
import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common';

@@ -29,4 +35,2 @@ import {

const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

@@ -105,2 +109,4 @@ const STATE_IDLE = 'idle';

override component = MongoLoggableComponent.TOPOLOGY;
/** @internal */
private rttSampler: RTTSampler;

@@ -127,2 +133,3 @@ constructor(server: Server, options: MonitorOptions) {

this.mongoLogger = this[kServer].topology.client?.mongoLogger;
this.rttSampler = new RTTSampler(10);

@@ -210,2 +217,22 @@ const cancellationToken = this[kCancellationToken];

}
get roundTripTime(): number {
return this.rttSampler.average();
}
get minRoundTripTime(): number {
return this.rttSampler.min();
}
get latestRtt(): number {
return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable
}
addRttSample(rtt: number) {
this.rttSampler.addSample(rtt);
}
clearRttSamples() {
this.rttSampler.clear();
}
}

@@ -224,2 +251,4 @@

monitor.connection = null;
monitor.clearRttSamples();
}

@@ -258,3 +287,2 @@

monitor.connection = null;
monitor.emitAndLogHeartbeat(

@@ -285,7 +313,11 @@ Server.SERVER_HEARTBEAT_FAILED,

// NOTE: here we use the latestRtt as this measurement corresponds with the value
// obtained for this successful heartbeat
const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
? monitor.rttPinger.latestRtt ?? calculateDurationInMs(start)
: calculateDurationInMs(start);
monitor.addRttSample(duration);
monitor.emitAndLogHeartbeat(

@@ -339,9 +371,3 @@ Server.SERVER_HEARTBEAT_SUCCEEDED,

if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
monitor.connectOptions
)
);
monitor.rttPinger = new RTTPinger(monitor);
}

@@ -389,2 +415,4 @@

}
const duration = calculateDurationInMs(start);
monitor.addRttSample(duration);

@@ -398,3 +426,3 @@ monitor.connection = connection;

monitor.address,
calculateDurationInMs(start),
duration,
connection.hello,

@@ -472,21 +500,28 @@ useStreamingProtocol(monitor, connection.hello?.topologyVersion)

/** @internal */
[kRoundTripTime]: number;
[kMonitorId]: NodeJS.Timeout;
/** @internal */
[kMonitorId]: NodeJS.Timeout;
monitor: Monitor;
closed: boolean;
/** @internal */
latestRtt?: number;
constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
constructor(monitor: Monitor) {
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this[kCancellationToken] = monitor[kCancellationToken];
this.closed = false;
this.monitor = monitor;
this.latestRtt = monitor.latestRtt;
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
}
get roundTripTime(): number {
return this[kRoundTripTime];
return this.monitor.roundTripTime;
}
get minRoundTripTime(): number {
return this.monitor.minRoundTripTime;
}
close(): void {

@@ -499,15 +534,8 @@ this.closed = true;

}
}
function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
const start = now();
options.cancellationToken = rttPinger[kCancellationToken];
const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
if (rttPinger.closed) {
return;
}
function measureAndReschedule(conn?: Connection) {
if (rttPinger.closed) {
private measureAndReschedule(start?: number, conn?: Connection) {
if (start == null) {
start = now();
}
if (this.closed) {
conn?.destroy();

@@ -517,40 +545,46 @@ return;

if (rttPinger.connection == null) {
rttPinger.connection = conn;
if (this.connection == null) {
this.connection = conn;
}
rttPinger[kRoundTripTime] = calculateDurationInMs(start);
rttPinger[kMonitorId] = setTimeout(
() => measureRoundTripTime(rttPinger, options),
heartbeatFrequencyMS
this.latestRtt = calculateDurationInMs(start);
this[kMonitorId] = setTimeout(
() => this.measureRoundTripTime(),
this.monitor.options.heartbeatFrequencyMS
);
}
const connection = rttPinger.connection;
if (connection == null) {
private measureRoundTripTime() {
const start = now();
if (this.closed) {
return;
}
const connection = this.connection;
if (connection == null) {
// eslint-disable-next-line github/no-then
connect(this.monitor.connectOptions).then(
connection => {
this.measureAndReschedule(start, connection);
},
() => {
this.connection = undefined;
}
);
return;
}
const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connect(options).then(
connection => {
measureAndReschedule(connection);
},
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => this.measureAndReschedule(),
() => {
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
this.connection?.destroy();
this.connection = undefined;
return;
}
);
return;
}
const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
// eslint-disable-next-line github/no-then
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => {
rttPinger.connection?.destroy();
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
);
}

@@ -683,1 +717,80 @@

}
/** @internal
* This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations)
*
* This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping
* the most recent `windowSize` samples
* */
export class RTTSampler {
/** Index of the next slot to be overwritten */
private writeIndex: number;
private length: number;
private rttSamples: Float64Array;
constructor(windowSize = 10) {
this.rttSamples = new Float64Array(windowSize);
this.length = 0;
this.writeIndex = 0;
}
/**
* Adds an rtt sample to the end of the circular buffer
* When `windowSize` samples have been collected, `addSample` overwrites the least recently added
* sample
*/
addSample(sample: number) {
this.rttSamples[this.writeIndex++] = sample;
if (this.length < this.rttSamples.length) {
this.length++;
}
this.writeIndex %= this.rttSamples.length;
}
/**
* When \< 2 samples have been collected, returns 0
* Otherwise computes the minimum value samples contained in the buffer
*/
min(): number {
if (this.length < 2) return 0;
let min = this.rttSamples[0];
for (let i = 1; i < this.length; i++) {
if (this.rttSamples[i] < min) min = this.rttSamples[i];
}
return min;
}
/**
* Returns mean of samples contained in the buffer
*/
average(): number {
if (this.length === 0) return 0;
let sum = 0;
for (let i = 0; i < this.length; i++) {
sum += this.rttSamples[i];
}
return sum / this.length;
}
/**
* Returns most recently inserted element in the buffer
* Returns null if the buffer is empty
* */
get last(): number | null {
if (this.length === 0) return null;
return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1];
}
/**
* Clear the buffer
* NOTE: this does not overwrite the data held in the internal array, just the pointers into
* this array
*/
clear() {
this.length = 0;
this.writeIndex = 0;
}
}

@@ -36,4 +36,6 @@ import { type Document, Long, type ObjectId } from '../bson';

/** The round trip time to ping this server (in ms) */
/** The average round trip time to ping this server (in ms) */
roundTripTime?: number;
/** The minimum round trip time to ping this server over the past 10 samples(in ms) */
minRoundTripTime?: number;

@@ -62,2 +64,4 @@ /** If the client is in load balancing mode. */

roundTripTime: number;
/** The minimum measurement of the last 10 measurements of roundTripTime that have been collected */
minRoundTripTime: number;
lastUpdateTime: number;

@@ -103,2 +107,3 @@ lastWriteDate: number;

this.roundTripTime = options?.roundTripTime ?? -1;
this.minRoundTripTime = options?.minRoundTripTime ?? 0;
this.lastUpdateTime = now();

@@ -105,0 +110,0 @@ this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0;

@@ -226,5 +226,4 @@ import { MongoCompatibilityError, MongoInvalidArgumentError } from '../error';

const low = servers.reduce(
(min: number, server: ServerDescription) =>
min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min),
-1
(min: number, server: ServerDescription) => Math.min(server.roundTripTime, min),
Infinity
);

@@ -231,0 +230,0 @@

@@ -178,3 +178,4 @@ import type { Document } from '../bson';

new ServerDescription(this.description.hostAddress, event.reply, {
roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
roundTripTime: this.monitor?.roundTripTime,
minRoundTripTime: this.monitor?.minRoundTripTime
})

@@ -471,11 +472,2 @@ );

function calculateRoundTripTime(oldRtt: number, duration: number): number {
if (oldRtt === -1) {
return duration;
}
const alpha = 0.2;
return alpha * duration + (1 - alpha) * oldRtt;
}
function markServerUnknown(server: Server, error?: MongoError) {

@@ -482,0 +474,0 @@ // Load balancer servers can never be marked unknown.

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc