tardis-dev
Advanced tools
Comparing version 7.7.7 to 7.7.8
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const events_1 = require("events"); | ||
const stream_1 = require("stream"); | ||
const handy_1 = require("./handy"); | ||
const NODE_MAJOR_VERSION = Number(process.versions.node.split('.')[0]); | ||
const DATE_MAX = new Date(8640000000000000); | ||
@@ -29,9 +28,9 @@ async function nextWithIndex(iterator, index) { | ||
async function* combine(...iterators) { | ||
var _a; | ||
if (iterators.length === 0) { | ||
return; | ||
} | ||
let buffer = undefined; | ||
try { | ||
const firstReceivedResult = await iterators[0].next(); | ||
const nextResults = iterators.map(nextWithIndex); | ||
const { result } = await Promise.race(nextResults); | ||
const firstReceivedMessage = result.value; | ||
const now = new Date(); | ||
@@ -42,33 +41,16 @@ const THREE_MINUTES_IN_MS = 3 * 60 * handy_1.ONE_SEC_IN_MS; | ||
// alternative would be to provide isRealtime via param to combine fn, perhaps less magic?... | ||
const isRealTime = firstReceivedResult.value.localTimestamp.valueOf() + THREE_MINUTES_IN_MS > now.valueOf(); | ||
const isRealTime = firstReceivedMessage.localTimestamp.valueOf() + THREE_MINUTES_IN_MS > now.valueOf(); | ||
if (isRealTime) { | ||
yield firstReceivedResult.value; | ||
buffer = new stream_1.PassThrough({ | ||
objectMode: true, | ||
highWaterMark: 1024 | ||
}); | ||
iterators.forEach(async (messages) => { | ||
for await (const message of messages) { | ||
if (!buffer.write(message)) { | ||
//Handle backpressure on write | ||
await events_1.once(buffer, 'drain'); | ||
} | ||
} | ||
}); | ||
for await (const message of buffer) { | ||
yield message; | ||
if (NODE_MAJOR_VERSION < 13) { | ||
console.warn('Important! using combine for real-time streams may cause memory leaks due to Node.js issue with Promise.race handling - https://github.com/nodejs/node/issues/29385. Try installing Node v13 which seems to have this issue fixed.'); | ||
} | ||
while (true) { | ||
// return messages in FIFO order thanks to using Promise.race | ||
// based on https://github.com/fraxken/combine-async-iterators | ||
const { index, result } = await Promise.race(nextResults); | ||
yield result.value; | ||
nextResults[index] = nextWithIndex(iterators[index], index); | ||
} | ||
} | ||
else { | ||
// first item was already read so we need to 'put it back' as if it wasn't | ||
const nextResults = [ | ||
Promise.resolve({ | ||
index: 0, | ||
result: firstReceivedResult | ||
}) | ||
]; | ||
// add rest of the items to array we're work with later on | ||
for (var i = 1; i < iterators.length; i++) { | ||
nextResults[i] = nextWithIndex(iterators[i], i); | ||
} | ||
return yield* combineHistorical(iterators, nextResults); | ||
@@ -78,3 +60,2 @@ } | ||
finally { | ||
(_a = buffer) === null || _a === void 0 ? void 0 : _a.end(); | ||
// clean up - this will close open real-time connections | ||
@@ -81,0 +62,0 @@ for (const iterator of iterators) { |
@@ -63,3 +63,3 @@ "use strict"; | ||
this.debug('provided filters: %o mapped to subscribe messages: %o', this._filters, subscribeMessages); | ||
this._ws = new ws_1.default(this.wssURL, { perMessageDeflate: false, handshakeTimeout: 5 * handy_1.ONE_SEC_IN_MS }); | ||
this._ws = new ws_1.default(this.wssURL, { perMessageDeflate: false, handshakeTimeout: 10 * handy_1.ONE_SEC_IN_MS }); | ||
this._ws.onopen = this._onConnectionEstabilished; | ||
@@ -66,0 +66,0 @@ this._ws.onclose = this._onConnectionClosed; |
{ | ||
"name": "tardis-dev", | ||
"version": "7.7.7", | ||
"version": "7.7.8", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=12" |
@@ -1,3 +0,1 @@ | ||
import { once } from 'events' | ||
import { PassThrough } from 'stream' | ||
import { ONE_SEC_IN_MS } from './handy' | ||
@@ -12,2 +10,4 @@ | ||
const NODE_MAJOR_VERSION = Number(process.versions.node.split('.')[0]) | ||
const DATE_MAX = new Date(8640000000000000) | ||
@@ -48,6 +48,8 @@ | ||
} | ||
let buffer: PassThrough | undefined = undefined | ||
try { | ||
const firstReceivedResult = await iterators[0].next() | ||
const nextResults = iterators.map(nextWithIndex) | ||
const { result } = await Promise.race(nextResults) | ||
const firstReceivedMessage = result.value | ||
const now = new Date() | ||
@@ -59,43 +61,23 @@ const THREE_MINUTES_IN_MS = 3 * 60 * ONE_SEC_IN_MS | ||
// alternative would be to provide isRealtime via param to combine fn, perhaps less magic?... | ||
const isRealTime = firstReceivedResult.value.localTimestamp.valueOf() + THREE_MINUTES_IN_MS > now.valueOf() | ||
const isRealTime = firstReceivedMessage.localTimestamp.valueOf() + THREE_MINUTES_IN_MS > now.valueOf() | ||
if (isRealTime) { | ||
yield firstReceivedResult.value as any | ||
if (NODE_MAJOR_VERSION < 13) { | ||
console.warn( | ||
'Important! using combine for real-time streams may cause memory leaks due to Node.js issue with Promise.race handling - https://github.com/nodejs/node/issues/29385. Try installing Node v13 which seems to have this issue fixed.' | ||
) | ||
} | ||
buffer = new PassThrough({ | ||
objectMode: true, | ||
highWaterMark: 1024 | ||
}) | ||
while (true) { | ||
// return messages in FIFO order thanks to using Promise.race | ||
// based on https://github.com/fraxken/combine-async-iterators | ||
const { index, result } = await Promise.race(nextResults) | ||
iterators.forEach(async messages => { | ||
for await (const message of messages) { | ||
if (!buffer!.write(message)) { | ||
//Handle backpressure on write | ||
await once(buffer!, 'drain') | ||
} | ||
} | ||
}) | ||
for await (const message of buffer as any) { | ||
yield message | ||
yield result.value as any | ||
nextResults[index] = nextWithIndex(iterators[index], index) | ||
} | ||
} else { | ||
// first item was already read so we need to 'put it back' as if it wasn't | ||
const nextResults = [ | ||
Promise.resolve({ | ||
index: 0, | ||
result: firstReceivedResult | ||
}) | ||
] | ||
// add rest of the items to array we're work with later on | ||
for (var i = 1; i < iterators.length; i++) { | ||
nextResults[i] = nextWithIndex(iterators[i], i) | ||
} | ||
return yield* combineHistorical(iterators, nextResults) as any | ||
} | ||
} finally { | ||
buffer?.end() | ||
// clean up - this will close open real-time connections | ||
@@ -102,0 +84,0 @@ for (const iterator of iterators) { |
@@ -42,3 +42,3 @@ import dbg from 'debug' | ||
this._ws = new WebSocket(this.wssURL, { perMessageDeflate: false, handshakeTimeout: 5 * ONE_SEC_IN_MS }) | ||
this._ws = new WebSocket(this.wssURL, { perMessageDeflate: false, handshakeTimeout: 10 * ONE_SEC_IN_MS }) | ||
@@ -45,0 +45,0 @@ this._ws.onopen = this._onConnectionEstabilished |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
588696
10373