tardis-machine
Advanced tools
Comparing version 1.3.3 to 1.3.4
@@ -34,2 +34,3 @@ /// <reference types="node" /> | ||
private _sendBatch; | ||
private _waitForEmptyBuffer; | ||
private _checkConnection; | ||
@@ -36,0 +37,0 @@ private _sendDataFeedIterable; |
@@ -9,2 +9,3 @@ "use strict"; | ||
const debug = debug_1.default('tardis-machine'); | ||
const wait = (delayMS) => new Promise(resolve => setTimeout(resolve, delayMS)); | ||
class ReplaySession { | ||
@@ -82,3 +83,3 @@ constructor() { | ||
while (!error && connection.bufferedAmount > 0) { | ||
await new Promise(resolve => setTimeout(resolve, 100)); | ||
await wait(100); | ||
} | ||
@@ -161,3 +162,3 @@ connection.close(error); | ||
if (buffered.length == 10) { | ||
this._sendBatch(buffered); | ||
await this._sendBatch(buffered); | ||
buffered = []; | ||
@@ -167,3 +168,3 @@ } | ||
if (buffered.length > 0) { | ||
this._sendBatch(buffered); | ||
await this._sendBatch(buffered); | ||
} | ||
@@ -174,4 +175,5 @@ } | ||
} | ||
_sendBatch(batch) { | ||
async _sendBatch(batch) { | ||
this._checkConnection(); | ||
await this._waitForEmptyBuffer(); | ||
for (let i = 0; i < batch.length; i++) { | ||
@@ -183,2 +185,10 @@ this._websocket.send(batch[i], { | ||
} | ||
async _waitForEmptyBuffer() { | ||
// wait until buffer is empty | ||
// sometimes slow clients can't keep up with messages arrival rate so we need to throttle | ||
// https://nodejs.org/api/net.html#net_socket_buffersize | ||
while (this.bufferedAmount > 0) { | ||
await wait(30); | ||
} | ||
} | ||
_checkConnection() { | ||
@@ -196,2 +206,3 @@ if (this._websocket.readyState != this._websocket.OPEN) { | ||
this._checkConnection(); | ||
await this._waitForEmptyBuffer(); | ||
this._websocket.send(message, { | ||
@@ -198,0 +209,0 @@ binary: false |
{ | ||
"name": "tardis-machine", | ||
"version": "1.3.3", | ||
"version": "1.3.4", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=12" |
@@ -9,2 +9,4 @@ import { ParsedUrlQuery } from 'querystring' | ||
const wait = (delayMS: number) => new Promise(resolve => setTimeout(resolve, delayMS)) | ||
export class ReplaySession { | ||
@@ -94,3 +96,3 @@ private readonly _connections: WebsocketConnection[] = [] | ||
while (!error && connection.bufferedAmount > 0) { | ||
await new Promise(resolve => setTimeout(resolve, 100)) | ||
await wait(100) | ||
} | ||
@@ -156,5 +158,3 @@ | ||
throw new Error( | ||
`No subscriptions received for websocket connection, exchange ${this._replayOptions.exchange}, from ${ | ||
this._replayOptions.from | ||
}, to ${this._replayOptions.to}` | ||
`No subscriptions received for websocket connection, exchange ${this._replayOptions.exchange}, from ${this._replayOptions.from}, to ${this._replayOptions.to}` | ||
) | ||
@@ -196,3 +196,3 @@ } | ||
if (buffered.length == 10) { | ||
this._sendBatch(buffered) | ||
await this._sendBatch(buffered) | ||
buffered = [] | ||
@@ -203,3 +203,3 @@ } | ||
if (buffered.length > 0) { | ||
this._sendBatch(buffered) | ||
await this._sendBatch(buffered) | ||
} | ||
@@ -212,4 +212,6 @@ } | ||
private _sendBatch(batch: Buffer[]) { | ||
private async _sendBatch(batch: Buffer[]) { | ||
this._checkConnection() | ||
await this._waitForEmptyBuffer() | ||
for (let i = 0; i < batch.length; i++) { | ||
@@ -222,2 +224,11 @@ this._websocket.send(batch[i], { | ||
private async _waitForEmptyBuffer() { | ||
// wait until buffer is empty | ||
// sometimes slow clients can't keep up with messages arrival rate so we need to throttle | ||
// https://nodejs.org/api/net.html#net_socket_buffersize | ||
while (this.bufferedAmount > 0) { | ||
await wait(30) | ||
} | ||
} | ||
private _checkConnection() { | ||
@@ -237,2 +248,4 @@ if (this._websocket.readyState != this._websocket.OPEN) { | ||
this._checkConnection() | ||
await this._waitForEmptyBuffer() | ||
this._websocket.send(message, { | ||
@@ -239,0 +252,0 @@ binary: false |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
100733
1253