mongodb
Advanced tools
Comparing version 6.3.0-dev.20240220.sha.90cb6fa to 6.3.0-dev.20240221.sha.38742c2
@@ -41,2 +41,4 @@ "use strict"; | ||
super(); | ||
this.lastHelloMS = -1; | ||
this.helloOk = false; | ||
this.delayedTimeoutId = null; | ||
@@ -55,3 +57,12 @@ this.clusterTime = null; | ||
this.socket = stream; | ||
// TODO: Remove signal from connection layer | ||
this.controller = new AbortController(); | ||
const { signal } = this.controller; | ||
this.signal = signal; | ||
const { promise: aborted, reject } = (0, utils_1.promiseWithResolvers)(); | ||
aborted.then(undefined, () => null); // Prevent unhandled rejection | ||
this.signal.addEventListener('abort', function onAbort() { | ||
reject(signal.reason); | ||
}, { once: true }); | ||
this.aborted = aborted; | ||
this.messageStream = this.socket | ||
@@ -65,3 +76,3 @@ .on('error', this.onError.bind(this)) | ||
this.socketWrite = async (buffer) => { | ||
return (0, utils_1.abortable)(socketWrite(buffer), { signal: this.controller.signal }); | ||
return Promise.race([socketWrite(buffer), this.aborted]); | ||
}; | ||
@@ -71,3 +82,3 @@ } | ||
get closed() { | ||
return this.controller.signal.aborted; | ||
return this.signal.aborted; | ||
} | ||
@@ -210,3 +221,3 @@ get hello() { | ||
async *sendWire(message, options) { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
if (typeof options.socketTimeoutMS === 'number') { | ||
@@ -227,3 +238,3 @@ this.socket.setTimeout(options.socketTimeoutMS); | ||
} | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
for await (const response of this.readMany()) { | ||
@@ -244,3 +255,3 @@ this.socket.setTimeout(0); | ||
yield document; | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
if (typeof options.socketTimeoutMS === 'number') { | ||
@@ -267,3 +278,3 @@ this.socket.setTimeout(options.socketTimeoutMS); | ||
try { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
for await (document of this.sendWire(message, options)) { | ||
@@ -281,3 +292,3 @@ if (!Buffer.isBuffer(document) && document.writeConcernError) { | ||
yield document; | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
} | ||
@@ -298,3 +309,3 @@ } | ||
async command(ns, command, options = {}) { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
for await (const document of this.sendCommand(ns, command, options)) { | ||
@@ -307,6 +318,6 @@ return document; | ||
const exhaustLoop = async () => { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
for await (const reply of this.sendCommand(ns, command, options)) { | ||
replyListener(undefined, reply); | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
} | ||
@@ -317,2 +328,5 @@ throw new error_1.MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly'); | ||
} | ||
throwIfAborted() { | ||
this.signal.throwIfAborted(); | ||
} | ||
/** | ||
@@ -344,3 +358,3 @@ * @internal | ||
async *readMany() { | ||
for await (const message of (0, on_data_1.onData)(this.messageStream, { signal: this.controller.signal })) { | ||
for await (const message of (0, on_data_1.onData)(this.messageStream, { signal: this.signal })) { | ||
const response = await (0, compression_1.decompressResponse)(message); | ||
@@ -347,0 +361,0 @@ yield response; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.COSMOS_DB_CHECK = exports.DOCUMENT_DB_CHECK = exports.TimeoutController = exports.request = exports.matchesParentDomain = exports.parseUnsignedInteger = exports.parseInteger = exports.compareObjectId = exports.commandSupportsReadConcern = exports.shuffle = exports.supportsRetryableWrites = exports.enumToString = exports.emitWarningOnce = exports.emitWarning = exports.MONGODB_WARNING_CODE = exports.DEFAULT_PK_FACTORY = exports.HostAddress = exports.BufferPool = exports.List = exports.deepCopy = exports.isRecord = exports.setDifference = exports.isHello = exports.isSuperset = exports.resolveOptions = exports.hasAtomicOperators = exports.calculateDurationInMs = exports.now = exports.makeStateMachine = exports.errorStrictEqual = exports.arrayStrictEqual = exports.eachAsync = exports.maxWireVersion = exports.uuidV4 = exports.makeCounter = exports.MongoDBCollectionNamespace = exports.MongoDBNamespace = exports.ns = exports.getTopology = exports.decorateWithExplain = exports.decorateWithReadConcern = exports.decorateWithCollation = exports.isPromiseLike = exports.applyRetryableWrites = exports.filterOptions = exports.mergeOptions = exports.isObject = exports.normalizeHintField = exports.hostMatchesWildcards = exports.ByteUtils = void 0; | ||
exports.promiseWithResolvers = exports.abortable = exports.isHostMatch = exports.COSMOS_DB_MSG = exports.DOCUMENT_DB_MSG = void 0; | ||
exports.promiseWithResolvers = exports.isHostMatch = exports.COSMOS_DB_MSG = exports.DOCUMENT_DB_MSG = void 0; | ||
const crypto = require("crypto"); | ||
@@ -1032,29 +1032,2 @@ const http = require("http"); | ||
exports.isHostMatch = isHostMatch; | ||
/** | ||
* Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal. | ||
* The given promise is _always_ ordered before the signal's abort promise. | ||
* When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence. | ||
* | ||
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race | ||
* | ||
* @param promise - A promise to discard if the signal aborts | ||
* @param options - An options object carrying an optional signal | ||
*/ | ||
async function abortable(promise, { signal }) { | ||
const { promise: aborted, reject } = promiseWithResolvers(); | ||
function rejectOnAbort() { | ||
reject(signal.reason); | ||
} | ||
if (signal.aborted) | ||
rejectOnAbort(); | ||
else | ||
signal.addEventListener('abort', rejectOnAbort, { once: true }); | ||
try { | ||
return await Promise.race([promise, aborted]); | ||
} | ||
finally { | ||
signal.removeEventListener('abort', rejectOnAbort); | ||
} | ||
} | ||
exports.abortable = abortable; | ||
function promiseWithResolvers() { | ||
@@ -1061,0 +1034,0 @@ let resolve; |
{ | ||
"name": "mongodb", | ||
"version": "6.3.0-dev.20240220.sha.90cb6fa", | ||
"version": "6.3.0-dev.20240221.sha.38742c2", | ||
"description": "The official MongoDB driver for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -32,3 +32,2 @@ import { type Readable, Transform, type TransformCallback } from 'stream'; | ||
import { | ||
abortable, | ||
BufferPool, | ||
@@ -41,2 +40,3 @@ calculateDurationInMs, | ||
now, | ||
promiseWithResolvers, | ||
uuidV4 | ||
@@ -166,5 +166,5 @@ } from '../utils'; | ||
public address: string; | ||
public lastHelloMS?: number; | ||
public lastHelloMS = -1; | ||
public serverApi?: ServerApi; | ||
public helloOk?: boolean; | ||
public helloOk = false; | ||
public authContext?: AuthContext; | ||
@@ -175,3 +175,2 @@ public delayedTimeoutId: NodeJS.Timeout | null = null; | ||
/** | ||
* @public | ||
* Represents if the connection has been established: | ||
@@ -187,12 +186,13 @@ * - TCP handshake | ||
private lastUseTime: number; | ||
private socketTimeoutMS: number; | ||
private monitorCommands: boolean; | ||
private socket: Stream; | ||
private controller: AbortController; | ||
private messageStream: Readable; | ||
private socketWrite: (buffer: Uint8Array) => Promise<void>; | ||
private clusterTime: Document | null = null; | ||
/** @internal */ | ||
override mongoLogger: MongoLogger | undefined; | ||
private readonly socketTimeoutMS: number; | ||
private readonly monitorCommands: boolean; | ||
private readonly socket: Stream; | ||
private readonly controller: AbortController; | ||
private readonly signal: AbortSignal; | ||
private readonly messageStream: Readable; | ||
private readonly socketWrite: (buffer: Uint8Array) => Promise<void>; | ||
private readonly aborted: Promise<never>; | ||
/** @event */ | ||
@@ -229,3 +229,17 @@ static readonly COMMAND_STARTED = COMMAND_STARTED; | ||
this.socket = stream; | ||
// TODO: Remove signal from connection layer | ||
this.controller = new AbortController(); | ||
const { signal } = this.controller; | ||
this.signal = signal; | ||
const { promise: aborted, reject } = promiseWithResolvers<never>(); | ||
aborted.then(undefined, () => null); // Prevent unhandled rejection | ||
this.signal.addEventListener( | ||
'abort', | ||
function onAbort() { | ||
reject(signal.reason); | ||
}, | ||
{ once: true } | ||
); | ||
this.aborted = aborted; | ||
@@ -241,3 +255,3 @@ this.messageStream = this.socket | ||
this.socketWrite = async buffer => { | ||
return abortable(socketWrite(buffer), { signal: this.controller.signal }); | ||
return Promise.race([socketWrite(buffer), this.aborted]); | ||
}; | ||
@@ -248,3 +262,3 @@ } | ||
public get closed(): boolean { | ||
return this.controller.signal.aborted; | ||
return this.signal.aborted; | ||
} | ||
@@ -418,3 +432,3 @@ | ||
private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
@@ -438,3 +452,3 @@ if (typeof options.socketTimeoutMS === 'number') { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
@@ -460,3 +474,3 @@ for await (const response of this.readMany()) { | ||
yield document; | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
@@ -495,3 +509,3 @@ if (typeof options.socketTimeoutMS === 'number') { | ||
try { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
for await (document of this.sendWire(message, options)) { | ||
@@ -526,3 +540,3 @@ if (!Buffer.isBuffer(document) && document.writeConcernError) { | ||
yield document; | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
} | ||
@@ -570,3 +584,3 @@ } catch (error) { | ||
): Promise<Document> { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
for await (const document of this.sendCommand(ns, command, options)) { | ||
@@ -585,6 +599,6 @@ return document; | ||
const exhaustLoop = async () => { | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
for await (const reply of this.sendCommand(ns, command, options)) { | ||
replyListener(undefined, reply); | ||
this.controller.signal.throwIfAborted(); | ||
this.throwIfAborted(); | ||
} | ||
@@ -596,2 +610,6 @@ throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly'); | ||
private throwIfAborted() { | ||
this.signal.throwIfAborted(); | ||
} | ||
/** | ||
@@ -630,3 +648,3 @@ * @internal | ||
private async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> { | ||
for await (const message of onData(this.messageStream, { signal: this.controller.signal })) { | ||
for await (const message of onData(this.messageStream, { signal: this.signal })) { | ||
const response = await decompressResponse(message); | ||
@@ -633,0 +651,0 @@ yield response; |
@@ -1286,32 +1286,2 @@ import * as crypto from 'crypto'; | ||
/** | ||
* Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal. | ||
* The given promise is _always_ ordered before the signal's abort promise. | ||
* When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence. | ||
* | ||
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race | ||
* | ||
* @param promise - A promise to discard if the signal aborts | ||
* @param options - An options object carrying an optional signal | ||
*/ | ||
export async function abortable<T>( | ||
promise: Promise<T>, | ||
{ signal }: { signal: AbortSignal } | ||
): Promise<T> { | ||
const { promise: aborted, reject } = promiseWithResolvers<never>(); | ||
function rejectOnAbort() { | ||
reject(signal.reason); | ||
} | ||
if (signal.aborted) rejectOnAbort(); | ||
else signal.addEventListener('abort', rejectOnAbort, { once: true }); | ||
try { | ||
return await Promise.race([promise, aborted]); | ||
} finally { | ||
signal.removeEventListener('abort', rejectOnAbort); | ||
} | ||
} | ||
export function promiseWithResolvers<T>() { | ||
@@ -1318,0 +1288,0 @@ let resolve!: Parameters<ConstructorParameters<typeof Promise<T>>[0]>[0]; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
2934237
60514