Socket
Socket
Sign inDemoInstall

mongodb

Package Overview
Dependencies
217
Maintainers
8
Versions
502
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 6.3.0-dev.20240220.sha.90cb6fa to 6.3.0-dev.20240221.sha.38742c2

36

lib/cmap/connection.js

@@ -41,2 +41,4 @@ "use strict";

super();
this.lastHelloMS = -1;
this.helloOk = false;
this.delayedTimeoutId = null;

@@ -55,3 +57,12 @@ this.clusterTime = null;

this.socket = stream;
// TODO: Remove signal from connection layer
this.controller = new AbortController();
const { signal } = this.controller;
this.signal = signal;
const { promise: aborted, reject } = (0, utils_1.promiseWithResolvers)();
aborted.then(undefined, () => null); // Prevent unhandled rejection
this.signal.addEventListener('abort', function onAbort() {
reject(signal.reason);
}, { once: true });
this.aborted = aborted;
this.messageStream = this.socket

@@ -65,3 +76,3 @@ .on('error', this.onError.bind(this))

this.socketWrite = async (buffer) => {
return (0, utils_1.abortable)(socketWrite(buffer), { signal: this.controller.signal });
return Promise.race([socketWrite(buffer), this.aborted]);
};

@@ -71,3 +82,3 @@ }

get closed() {
return this.controller.signal.aborted;
return this.signal.aborted;
}

@@ -210,3 +221,3 @@ get hello() {

async *sendWire(message, options) {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
if (typeof options.socketTimeoutMS === 'number') {

@@ -227,3 +238,3 @@ this.socket.setTimeout(options.socketTimeoutMS);

}
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (const response of this.readMany()) {

@@ -244,3 +255,3 @@ this.socket.setTimeout(0);

yield document;
this.controller.signal.throwIfAborted();
this.throwIfAborted();
if (typeof options.socketTimeoutMS === 'number') {

@@ -267,3 +278,3 @@ this.socket.setTimeout(options.socketTimeoutMS);

try {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (document of this.sendWire(message, options)) {

@@ -281,3 +292,3 @@ if (!Buffer.isBuffer(document) && document.writeConcernError) {

yield document;
this.controller.signal.throwIfAborted();
this.throwIfAborted();
}

@@ -298,3 +309,3 @@ }

async command(ns, command, options = {}) {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (const document of this.sendCommand(ns, command, options)) {

@@ -307,6 +318,6 @@ return document;

const exhaustLoop = async () => {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (const reply of this.sendCommand(ns, command, options)) {
replyListener(undefined, reply);
this.controller.signal.throwIfAborted();
this.throwIfAborted();
}

@@ -317,2 +328,5 @@ throw new error_1.MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');

}
throwIfAborted() {
this.signal.throwIfAborted();
}
/**

@@ -344,3 +358,3 @@ * @internal

async *readMany() {
for await (const message of (0, on_data_1.onData)(this.messageStream, { signal: this.controller.signal })) {
for await (const message of (0, on_data_1.onData)(this.messageStream, { signal: this.signal })) {
const response = await (0, compression_1.decompressResponse)(message);

@@ -347,0 +361,0 @@ yield response;

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.COSMOS_DB_CHECK = exports.DOCUMENT_DB_CHECK = exports.TimeoutController = exports.request = exports.matchesParentDomain = exports.parseUnsignedInteger = exports.parseInteger = exports.compareObjectId = exports.commandSupportsReadConcern = exports.shuffle = exports.supportsRetryableWrites = exports.enumToString = exports.emitWarningOnce = exports.emitWarning = exports.MONGODB_WARNING_CODE = exports.DEFAULT_PK_FACTORY = exports.HostAddress = exports.BufferPool = exports.List = exports.deepCopy = exports.isRecord = exports.setDifference = exports.isHello = exports.isSuperset = exports.resolveOptions = exports.hasAtomicOperators = exports.calculateDurationInMs = exports.now = exports.makeStateMachine = exports.errorStrictEqual = exports.arrayStrictEqual = exports.eachAsync = exports.maxWireVersion = exports.uuidV4 = exports.makeCounter = exports.MongoDBCollectionNamespace = exports.MongoDBNamespace = exports.ns = exports.getTopology = exports.decorateWithExplain = exports.decorateWithReadConcern = exports.decorateWithCollation = exports.isPromiseLike = exports.applyRetryableWrites = exports.filterOptions = exports.mergeOptions = exports.isObject = exports.normalizeHintField = exports.hostMatchesWildcards = exports.ByteUtils = void 0;
exports.promiseWithResolvers = exports.abortable = exports.isHostMatch = exports.COSMOS_DB_MSG = exports.DOCUMENT_DB_MSG = void 0;
exports.promiseWithResolvers = exports.isHostMatch = exports.COSMOS_DB_MSG = exports.DOCUMENT_DB_MSG = void 0;
const crypto = require("crypto");

@@ -1032,29 +1032,2 @@ const http = require("http");

exports.isHostMatch = isHostMatch;
/**
* Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal.
* The given promise is _always_ ordered before the signal's abort promise.
* When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence.
*
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race
*
* @param promise - A promise to discard if the signal aborts
* @param options - An options object carrying an optional signal
*/
async function abortable(promise, { signal }) {
const { promise: aborted, reject } = promiseWithResolvers();
function rejectOnAbort() {
reject(signal.reason);
}
if (signal.aborted)
rejectOnAbort();
else
signal.addEventListener('abort', rejectOnAbort, { once: true });
try {
return await Promise.race([promise, aborted]);
}
finally {
signal.removeEventListener('abort', rejectOnAbort);
}
}
exports.abortable = abortable;
function promiseWithResolvers() {

@@ -1061,0 +1034,0 @@ let resolve;

{
"name": "mongodb",
"version": "6.3.0-dev.20240220.sha.90cb6fa",
"version": "6.3.0-dev.20240221.sha.38742c2",
"description": "The official MongoDB driver for Node.js",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -32,3 +32,2 @@ import { type Readable, Transform, type TransformCallback } from 'stream';

import {
abortable,
BufferPool,

@@ -41,2 +40,3 @@ calculateDurationInMs,

now,
promiseWithResolvers,
uuidV4

@@ -166,5 +166,5 @@ } from '../utils';

public address: string;
public lastHelloMS?: number;
public lastHelloMS = -1;
public serverApi?: ServerApi;
public helloOk?: boolean;
public helloOk = false;
public authContext?: AuthContext;

@@ -175,3 +175,2 @@ public delayedTimeoutId: NodeJS.Timeout | null = null;

/**
* @public
* Represents if the connection has been established:

@@ -187,12 +186,13 @@ * - TCP handshake

private lastUseTime: number;
private socketTimeoutMS: number;
private monitorCommands: boolean;
private socket: Stream;
private controller: AbortController;
private messageStream: Readable;
private socketWrite: (buffer: Uint8Array) => Promise<void>;
private clusterTime: Document | null = null;
/** @internal */
override mongoLogger: MongoLogger | undefined;
private readonly socketTimeoutMS: number;
private readonly monitorCommands: boolean;
private readonly socket: Stream;
private readonly controller: AbortController;
private readonly signal: AbortSignal;
private readonly messageStream: Readable;
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>;
private readonly aborted: Promise<never>;
/** @event */

@@ -229,3 +229,17 @@ static readonly COMMAND_STARTED = COMMAND_STARTED;

this.socket = stream;
// TODO: Remove signal from connection layer
this.controller = new AbortController();
const { signal } = this.controller;
this.signal = signal;
const { promise: aborted, reject } = promiseWithResolvers<never>();
aborted.then(undefined, () => null); // Prevent unhandled rejection
this.signal.addEventListener(
'abort',
function onAbort() {
reject(signal.reason);
},
{ once: true }
);
this.aborted = aborted;

@@ -241,3 +255,3 @@ this.messageStream = this.socket

this.socketWrite = async buffer => {
return abortable(socketWrite(buffer), { signal: this.controller.signal });
return Promise.race([socketWrite(buffer), this.aborted]);
};

@@ -248,3 +262,3 @@ }

public get closed(): boolean {
return this.controller.signal.aborted;
return this.signal.aborted;
}

@@ -418,3 +432,3 @@

private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) {
this.controller.signal.throwIfAborted();
this.throwIfAborted();

@@ -438,3 +452,3 @@ if (typeof options.socketTimeoutMS === 'number') {

this.controller.signal.throwIfAborted();
this.throwIfAborted();

@@ -460,3 +474,3 @@ for await (const response of this.readMany()) {

yield document;
this.controller.signal.throwIfAborted();
this.throwIfAborted();

@@ -495,3 +509,3 @@ if (typeof options.socketTimeoutMS === 'number') {

try {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (document of this.sendWire(message, options)) {

@@ -526,3 +540,3 @@ if (!Buffer.isBuffer(document) && document.writeConcernError) {

yield document;
this.controller.signal.throwIfAborted();
this.throwIfAborted();
}

@@ -570,3 +584,3 @@ } catch (error) {

): Promise<Document> {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (const document of this.sendCommand(ns, command, options)) {

@@ -585,6 +599,6 @@ return document;

const exhaustLoop = async () => {
this.controller.signal.throwIfAborted();
this.throwIfAborted();
for await (const reply of this.sendCommand(ns, command, options)) {
replyListener(undefined, reply);
this.controller.signal.throwIfAborted();
this.throwIfAborted();
}

@@ -596,2 +610,6 @@ throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');

private throwIfAborted() {
this.signal.throwIfAborted();
}
/**

@@ -630,3 +648,3 @@ * @internal

private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of onData(this.messageStream, { signal: this.controller.signal })) {
for await (const message of onData(this.messageStream, { signal: this.signal })) {
const response = await decompressResponse(message);

@@ -633,0 +651,0 @@ yield response;

@@ -1286,32 +1286,2 @@ import * as crypto from 'crypto';

/**
* Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal.
* The given promise is _always_ ordered before the signal's abort promise.
* When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence.
*
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race
*
* @param promise - A promise to discard if the signal aborts
* @param options - An options object carrying an optional signal
*/
export async function abortable<T>(
promise: Promise<T>,
{ signal }: { signal: AbortSignal }
): Promise<T> {
const { promise: aborted, reject } = promiseWithResolvers<never>();
function rejectOnAbort() {
reject(signal.reason);
}
if (signal.aborted) rejectOnAbort();
else signal.addEventListener('abort', rejectOnAbort, { once: true });
try {
return await Promise.race([promise, aborted]);
} finally {
signal.removeEventListener('abort', rejectOnAbort);
}
}
export function promiseWithResolvers<T>() {

@@ -1318,0 +1288,0 @@ let resolve!: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[0];

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc