tardis-dev
Advanced tools
Comparing version 7.7.0 to 7.7.1
@@ -45,7 +45,3 @@ "use strict"; | ||
} | ||
let depthSnapshotResponse = (await got_1.default.get(`${this.httpURL}/depth?symbol=${symbol.toUpperCase()}&limit=1000`).json()); | ||
const snapshotIsStale = new Date(depthSnapshotResponse.T).getUTCSeconds() !== new Date(depthSnapshotResponse.E).getUTCSeconds(); | ||
if (snapshotIsStale) { | ||
depthSnapshotResponse = (await got_1.default.get(`${this.httpURL}/depth?symbol=${symbol.toUpperCase()}&limit=1000`).json()); | ||
} | ||
const depthSnapshotResponse = (await got_1.default.get(`${this.httpURL}/depth?symbol=${symbol.toUpperCase()}&limit=1000`).json()); | ||
const snapshot = { | ||
@@ -52,0 +48,0 @@ stream: `${symbol}@depthSnapshot`, |
@@ -16,3 +16,2 @@ import dbg from 'debug'; | ||
private _receivedMessagesCount; | ||
private _staleConnectionCheckIntervalId?; | ||
private _ws?; | ||
@@ -19,0 +18,0 @@ private static _connectionId; |
@@ -18,3 +18,2 @@ "use strict"; | ||
try { | ||
this._monitorConnectionIfStale(); | ||
const subscribeMessages = this.mapToSubscribeMessages(this._filters); | ||
@@ -48,4 +47,4 @@ let symbolsCount = this._filters.reduce((prev, curr) => { | ||
catch (e) { | ||
this.debug('providing manual snapshots error: %o, closing connection...', e); | ||
this._ws.terminate(); | ||
this.debug('providing manual snapshots error: %o', e); | ||
this._ws.emit('error', e); | ||
} | ||
@@ -63,2 +62,3 @@ }; | ||
async *_stream() { | ||
let timerId; | ||
try { | ||
@@ -74,2 +74,3 @@ const subscribeMessages = this.mapToSubscribeMessages(this._filters); | ||
this._ws.onclose = this._onConnectionClosed; | ||
timerId = this._monitorConnectionIfStale(); | ||
const realtimeMessagesStream = ws_1.default.createWebSocketStream(this._ws, { | ||
@@ -106,4 +107,4 @@ readableObjectMode: true, | ||
finally { | ||
if (this._staleConnectionCheckIntervalId !== undefined) { | ||
clearInterval(this._staleConnectionCheckIntervalId); | ||
if (timerId !== undefined) { | ||
clearInterval(timerId); | ||
} | ||
@@ -132,3 +133,3 @@ } | ||
// set up timer that checks against open, but stale connections that do not return any data | ||
this._staleConnectionCheckIntervalId = setInterval(() => { | ||
return setInterval(() => { | ||
if (this._ws === undefined) { | ||
@@ -135,0 +136,0 @@ return; |
@@ -46,75 +46,80 @@ "use strict"; | ||
}); | ||
// helper flag that helps us not yielding two subsequent undefined/disconnect messages | ||
let lastMessageWasUndefined = false; | ||
let currentSliceDate = new Date(fromDate); | ||
// iterate over every minute in <=from,to> date range | ||
// get cached slice paths, read them as file streams, decompress, split by new lines and yield as messages | ||
while (currentSliceDate < toDate) { | ||
const sliceKey = currentSliceDate.toISOString(); | ||
debug_1.debug('getting slice: %s, exchange: %s', sliceKey, exchange); | ||
let cachedSlicePath; | ||
while (cachedSlicePath === undefined) { | ||
cachedSlicePath = cachedSlicePaths.get(sliceKey); | ||
// if something went wrong with worker throw error it has returned (network issue, auth issue etc) | ||
if (workerError !== undefined) { | ||
throw workerError; | ||
try { | ||
// helper flag that helps us not yielding two subsequent undefined/disconnect messages | ||
let lastMessageWasUndefined = false; | ||
let currentSliceDate = new Date(fromDate); | ||
// iterate over every minute in <=from,to> date range | ||
// get cached slice paths, read them as file streams, decompress, split by new lines and yield as messages | ||
while (currentSliceDate < toDate) { | ||
const sliceKey = currentSliceDate.toISOString(); | ||
debug_1.debug('getting slice: %s, exchange: %s', sliceKey, exchange); | ||
let cachedSlicePath; | ||
while (cachedSlicePath === undefined) { | ||
cachedSlicePath = cachedSlicePaths.get(sliceKey); | ||
// if something went wrong with worker throw error it has returned (network issue, auth issue etc) | ||
if (workerError !== undefined) { | ||
throw workerError; | ||
} | ||
if (cachedSlicePath === undefined) { | ||
// if response for requested date is not ready yet wait 100ms and try again | ||
debug_1.debug('waiting for slice: %s, exchange: %s', sliceKey, exchange); | ||
await handy_1.wait(100); | ||
} | ||
} | ||
if (cachedSlicePath === undefined) { | ||
// if response for requested date is not ready yet wait 100ms and try again | ||
debug_1.debug('waiting for slice: %s, exchange: %s', sliceKey, exchange); | ||
await handy_1.wait(100); | ||
} | ||
} | ||
// response is a path to file on disk let' read it as stream | ||
const linesStream = fs_extra_1.createReadStream(cachedSlicePath, { highWaterMark: 128 * 1024 }) | ||
// unzip it | ||
.pipe(zlib_1.default.createGunzip({ chunkSize: 128 * 1024 })) | ||
// and split by new line | ||
.pipe(new binarysplit_1.BinarySplitStream()); | ||
let linesCount = 0; | ||
// date is always formatted to have lendth of 28 so we can skip looking for first space in line and use it | ||
// as hardcoded value | ||
const DATE_MESSAGE_SPLIT_INDEX = 28; | ||
for await (const line of linesStream) { | ||
const bufferLine = line; | ||
linesCount++; | ||
if (bufferLine.length > 0) { | ||
lastMessageWasUndefined = false; | ||
const localTimestampBuffer = bufferLine.slice(0, DATE_MESSAGE_SPLIT_INDEX); | ||
const messageBuffer = bufferLine.slice(DATE_MESSAGE_SPLIT_INDEX + 1); | ||
// as any due to https://github.com/Microsoft/TypeScript/issues/24929 | ||
if (skipDecoding === true) { | ||
yield { | ||
localTimestamp: localTimestampBuffer, | ||
message: messageBuffer | ||
}; | ||
// response is a path to file on disk let' read it as stream | ||
const linesStream = fs_extra_1.createReadStream(cachedSlicePath, { highWaterMark: 128 * 1024 }) | ||
// unzip it | ||
.pipe(zlib_1.default.createGunzip({ chunkSize: 128 * 1024 })) | ||
// and split by new line | ||
.pipe(new binarysplit_1.BinarySplitStream()); | ||
let linesCount = 0; | ||
// date is always formatted to have lendth of 28 so we can skip looking for first space in line and use it | ||
// as hardcoded value | ||
const DATE_MESSAGE_SPLIT_INDEX = 28; | ||
for await (const line of linesStream) { | ||
const bufferLine = line; | ||
linesCount++; | ||
if (bufferLine.length > 0) { | ||
lastMessageWasUndefined = false; | ||
const localTimestampBuffer = bufferLine.slice(0, DATE_MESSAGE_SPLIT_INDEX); | ||
const messageBuffer = bufferLine.slice(DATE_MESSAGE_SPLIT_INDEX + 1); | ||
// as any due to https://github.com/Microsoft/TypeScript/issues/24929 | ||
if (skipDecoding === true) { | ||
yield { | ||
localTimestamp: localTimestampBuffer, | ||
message: messageBuffer | ||
}; | ||
} | ||
else { | ||
yield { | ||
// when skipDecoding is not set, decode timestamp to Date and message to object | ||
localTimestamp: new Date(localTimestampBuffer.toString()), | ||
message: JSON.parse(messageBuffer) | ||
}; | ||
} | ||
// ignore empty lines unless withDisconnects is set to true | ||
// do not yield subsequent undefined messages | ||
} | ||
else { | ||
yield { | ||
// when skipDecoding is not set, decode timestamp to Date and message to object | ||
localTimestamp: new Date(localTimestampBuffer.toString()), | ||
message: JSON.parse(messageBuffer) | ||
}; | ||
else if (withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true; | ||
yield undefined; | ||
} | ||
// ignore empty lines unless withDisconnects is set to true | ||
// do not yield subsequent undefined messages | ||
} | ||
else if (withDisconnects === true && lastMessageWasUndefined === false) { | ||
// if slice was empty (no lines at all) yield undefined if flag is set | ||
// do not yield subsequent undefined messages eg: two empty slices produce single undefined/disconnect message | ||
if (linesCount === 0 && withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true; | ||
yield undefined; | ||
} | ||
debug_1.debug('processed slice: %s, exchange: %s, count: %d', sliceKey, exchange, linesCount); | ||
// remove slice key from the map as it's already processed | ||
cachedSlicePaths.delete(sliceKey); | ||
// move one minute forward | ||
currentSliceDate.setUTCMinutes(currentSliceDate.getUTCMinutes() + 1); | ||
} | ||
// if slice was empty (no lines at all) yield undefined if flag is set | ||
// do not yield subsequent undefined messages eg: two empty slices produce single undefined/disconnect message | ||
if (linesCount === 0 && withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true; | ||
yield undefined; | ||
} | ||
debug_1.debug('processed slice: %s, exchange: %s, count: %d', sliceKey, exchange, linesCount); | ||
// remove slice key from the map as it's already processed | ||
cachedSlicePaths.delete(sliceKey); | ||
// move one minute forward | ||
currentSliceDate.setUTCMinutes(currentSliceDate.getUTCMinutes() + 1); | ||
debug_1.debug('replay for exchange: %s finished - from: %s, to: %s, filters: %o', exchange, fromDate.toISOString(), toDate.toISOString(), filters); | ||
} | ||
debug_1.debug('replay for exchange: %s finished - from: %s, to: %s, filters: %o', exchange, fromDate.toISOString(), toDate.toISOString(), filters); | ||
finally { | ||
await worker.terminate(); | ||
} | ||
} | ||
@@ -121,0 +126,0 @@ exports.replay = replay; |
import { MapperFactory } from './mappers'; | ||
import { Disconnect, Exchange, FilterForExchange } from './types'; | ||
export declare function stream<T extends Exchange, U extends boolean = false>({ exchange, filters, timeoutIntervalMS, withDisconnects }: StreamOptions<T, U>): AsyncIterableIterator<U extends true ? { | ||
export declare function stream<T extends Exchange, U extends boolean = false>({ exchange, filters, timeoutIntervalMS, withDisconnects, onError }: StreamOptions<T, U>): AsyncIterableIterator<U extends true ? { | ||
localTimestamp: Date; | ||
@@ -10,3 +10,3 @@ message: any; | ||
}>; | ||
export declare function streamNormalized<T extends Exchange, U extends MapperFactory<T, any>[], Z extends boolean = false>({ exchange, symbols, timeoutIntervalMS, withDisconnectMessages }: StreamNormalizedOptions<T, Z>, ...normalizers: U): AsyncIterableIterator<Z extends true ? (U extends MapperFactory<infer _, infer X>[] ? X | Disconnect : never) : (U extends MapperFactory<infer _, infer X>[] ? X : never)>; | ||
export declare function streamNormalized<T extends Exchange, U extends MapperFactory<T, any>[], Z extends boolean = false>({ exchange, symbols, timeoutIntervalMS, withDisconnectMessages, onError }: StreamNormalizedOptions<T, Z>, ...normalizers: U): AsyncIterableIterator<Z extends true ? (U extends MapperFactory<infer _, infer X>[] ? X | Disconnect : never) : (U extends MapperFactory<infer _, infer X>[] ? X : never)>; | ||
export declare type StreamOptions<T extends Exchange, U extends boolean = false> = { | ||
@@ -17,2 +17,3 @@ exchange: T; | ||
withDisconnects?: U; | ||
onError?: (error: Error) => void; | ||
}; | ||
@@ -24,3 +25,4 @@ export declare type StreamNormalizedOptions<T extends Exchange, U extends boolean = false> = { | ||
withDisconnectMessages?: U; | ||
onError?: (error: Error) => void; | ||
}; | ||
//# sourceMappingURL=stream.d.ts.map |
@@ -6,3 +6,3 @@ "use strict"; | ||
const realtimefeeds_1 = require("./realtimefeeds"); | ||
async function* stream({ exchange, filters, timeoutIntervalMS = 10000, withDisconnects = undefined }) { | ||
async function* stream({ exchange, filters, timeoutIntervalMS = 10000, withDisconnects = undefined, onError = undefined }) { | ||
validateStreamOptions(filters); | ||
@@ -30,10 +30,18 @@ let retries = 0; | ||
catch (error) { | ||
if (onError !== undefined) { | ||
onError(error); | ||
} | ||
retries++; | ||
const MAX_DELAY = 16 * 1000; | ||
const isRateLimited = error.message.includes('429'); | ||
const expontent = isRateLimited ? retries + 4 : retries - 1; | ||
let delay = Math.pow(2, expontent) * 1000; | ||
const MAX_DELAY = 32 * 1000; | ||
if (delay > MAX_DELAY) { | ||
delay = MAX_DELAY; | ||
let delay; | ||
if (isRateLimited) { | ||
delay = MAX_DELAY * retries; | ||
} | ||
else { | ||
delay = Math.pow(2, retries - 1) * 1000; | ||
if (delay > MAX_DELAY) { | ||
delay = MAX_DELAY; | ||
} | ||
} | ||
debug_1.debug('%s real-time feed connection error, retries count: %d, next retry delay: %dms, error message: %o', exchange, retries, delay, error); | ||
@@ -48,3 +56,3 @@ if (withDisconnects) { | ||
exports.stream = stream; | ||
async function* streamNormalized({ exchange, symbols, timeoutIntervalMS = 10000, withDisconnectMessages = undefined }, ...normalizers) { | ||
async function* streamNormalized({ exchange, symbols, timeoutIntervalMS = 10000, withDisconnectMessages = undefined, onError = undefined }, ...normalizers) { | ||
// mappers assume that symbols are uppercased by default | ||
@@ -64,3 +72,4 @@ // if user by mistake provide lowercase one let's automatically fix it | ||
timeoutIntervalMS, | ||
filters | ||
filters, | ||
onError | ||
}); | ||
@@ -73,2 +82,5 @@ const normalizedMessages = handy_1.normalizeMessages(exchange, messages, mappers, createMappers, withDisconnectMessages); | ||
catch (error) { | ||
if (onError !== undefined) { | ||
onError(error); | ||
} | ||
debug_1.debug('%s normalize messages error: %o, retrying with new connection...', exchange, error); | ||
@@ -75,0 +87,0 @@ if (withDisconnectMessages) { |
{ | ||
"name": "tardis-dev", | ||
"version": "7.7.0", | ||
"version": "7.7.1", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=12" |
@@ -48,7 +48,3 @@ import got from 'got' | ||
let depthSnapshotResponse = (await got.get(`${this.httpURL}/depth?symbol=${symbol.toUpperCase()}&limit=1000`).json()) as any | ||
const snapshotIsStale = new Date(depthSnapshotResponse.T).getUTCSeconds() !== new Date(depthSnapshotResponse.E).getUTCSeconds() | ||
if (snapshotIsStale) { | ||
depthSnapshotResponse = (await got.get(`${this.httpURL}/depth?symbol=${symbol.toUpperCase()}&limit=1000`).json()) as any | ||
} | ||
const depthSnapshotResponse = (await got.get(`${this.httpURL}/depth?symbol=${symbol.toUpperCase()}&limit=1000`).json()) as any | ||
@@ -55,0 +51,0 @@ const snapshot = { |
@@ -21,3 +21,2 @@ import dbg from 'debug' | ||
private _receivedMessagesCount = 0 | ||
private _staleConnectionCheckIntervalId?: NodeJS.Timeout | ||
private _ws?: WebSocket | ||
@@ -36,2 +35,3 @@ private static _connectionId = 0 | ||
private async *_stream() { | ||
let timerId | ||
try { | ||
@@ -52,2 +52,4 @@ const subscribeMessages = this.mapToSubscribeMessages(this._filters) | ||
timerId = this._monitorConnectionIfStale() | ||
const realtimeMessagesStream = (WebSocket as any).createWebSocketStream(this._ws, { | ||
@@ -91,4 +93,4 @@ readableObjectMode: true, // othwerwise we may end up with multiple messages returned by stream in single iteration | ||
} finally { | ||
if (this._staleConnectionCheckIntervalId !== undefined) { | ||
clearInterval(this._staleConnectionCheckIntervalId) | ||
if (timerId !== undefined) { | ||
clearInterval(timerId) | ||
} | ||
@@ -130,3 +132,3 @@ } | ||
// set up timer that checks against open, but stale connections that do not return any data | ||
this._staleConnectionCheckIntervalId = setInterval(() => { | ||
return setInterval(() => { | ||
if (this._ws === undefined) { | ||
@@ -146,4 +148,2 @@ return | ||
try { | ||
this._monitorConnectionIfStale() | ||
const subscribeMessages = this.mapToSubscribeMessages(this._filters) | ||
@@ -183,4 +183,4 @@ | ||
} catch (e) { | ||
this.debug('providing manual snapshots error: %o, closing connection...', e) | ||
this._ws!.terminate() | ||
this.debug('providing manual snapshots error: %o', e) | ||
this._ws!.emit('error', e) | ||
} | ||
@@ -187,0 +187,0 @@ } |
@@ -70,84 +70,94 @@ import { createReadStream } from 'fs-extra' | ||
// helper flag that helps us not yielding two subsequent undefined/disconnect messages | ||
let lastMessageWasUndefined = false | ||
try { | ||
// helper flag that helps us not yielding two subsequent undefined/disconnect messages | ||
let lastMessageWasUndefined = false | ||
let currentSliceDate = new Date(fromDate) | ||
// iterate over every minute in <=from,to> date range | ||
// get cached slice paths, read them as file streams, decompress, split by new lines and yield as messages | ||
while (currentSliceDate < toDate) { | ||
const sliceKey = currentSliceDate.toISOString() | ||
let currentSliceDate = new Date(fromDate) | ||
// iterate over every minute in <=from,to> date range | ||
// get cached slice paths, read them as file streams, decompress, split by new lines and yield as messages | ||
while (currentSliceDate < toDate) { | ||
const sliceKey = currentSliceDate.toISOString() | ||
debug('getting slice: %s, exchange: %s', sliceKey, exchange) | ||
debug('getting slice: %s, exchange: %s', sliceKey, exchange) | ||
let cachedSlicePath | ||
while (cachedSlicePath === undefined) { | ||
cachedSlicePath = cachedSlicePaths.get(sliceKey) | ||
let cachedSlicePath | ||
while (cachedSlicePath === undefined) { | ||
cachedSlicePath = cachedSlicePaths.get(sliceKey) | ||
// if something went wrong with worker throw error it has returned (network issue, auth issue etc) | ||
if (workerError !== undefined) { | ||
throw workerError | ||
} | ||
// if something went wrong with worker throw error it has returned (network issue, auth issue etc) | ||
if (workerError !== undefined) { | ||
throw workerError | ||
} | ||
if (cachedSlicePath === undefined) { | ||
// if response for requested date is not ready yet wait 100ms and try again | ||
debug('waiting for slice: %s, exchange: %s', sliceKey, exchange) | ||
await wait(100) | ||
if (cachedSlicePath === undefined) { | ||
// if response for requested date is not ready yet wait 100ms and try again | ||
debug('waiting for slice: %s, exchange: %s', sliceKey, exchange) | ||
await wait(100) | ||
} | ||
} | ||
} | ||
// response is a path to file on disk let' read it as stream | ||
const linesStream = createReadStream(cachedSlicePath, { highWaterMark: 128 * 1024 }) | ||
// unzip it | ||
.pipe(zlib.createGunzip({ chunkSize: 128 * 1024 })) | ||
// and split by new line | ||
.pipe(new BinarySplitStream()) | ||
// response is a path to file on disk let' read it as stream | ||
const linesStream = createReadStream(cachedSlicePath, { highWaterMark: 128 * 1024 }) | ||
// unzip it | ||
.pipe(zlib.createGunzip({ chunkSize: 128 * 1024 })) | ||
// and split by new line | ||
.pipe(new BinarySplitStream()) | ||
let linesCount = 0 | ||
// date is always formatted to have lendth of 28 so we can skip looking for first space in line and use it | ||
// as hardcoded value | ||
const DATE_MESSAGE_SPLIT_INDEX = 28 | ||
let linesCount = 0 | ||
// date is always formatted to have lendth of 28 so we can skip looking for first space in line and use it | ||
// as hardcoded value | ||
const DATE_MESSAGE_SPLIT_INDEX = 28 | ||
for await (const line of linesStream) { | ||
const bufferLine = line as Buffer | ||
linesCount++ | ||
if (bufferLine.length > 0) { | ||
lastMessageWasUndefined = false | ||
const localTimestampBuffer = bufferLine.slice(0, DATE_MESSAGE_SPLIT_INDEX) | ||
const messageBuffer = bufferLine.slice(DATE_MESSAGE_SPLIT_INDEX + 1) | ||
// as any due to https://github.com/Microsoft/TypeScript/issues/24929 | ||
if (skipDecoding === true) { | ||
yield { | ||
localTimestamp: localTimestampBuffer, | ||
message: messageBuffer | ||
} as any | ||
} else { | ||
yield { | ||
// when skipDecoding is not set, decode timestamp to Date and message to object | ||
localTimestamp: new Date(localTimestampBuffer.toString()), | ||
message: JSON.parse(messageBuffer as any) | ||
} as any | ||
for await (const line of linesStream) { | ||
const bufferLine = line as Buffer | ||
linesCount++ | ||
if (bufferLine.length > 0) { | ||
lastMessageWasUndefined = false | ||
const localTimestampBuffer = bufferLine.slice(0, DATE_MESSAGE_SPLIT_INDEX) | ||
const messageBuffer = bufferLine.slice(DATE_MESSAGE_SPLIT_INDEX + 1) | ||
// as any due to https://github.com/Microsoft/TypeScript/issues/24929 | ||
if (skipDecoding === true) { | ||
yield { | ||
localTimestamp: localTimestampBuffer, | ||
message: messageBuffer | ||
} as any | ||
} else { | ||
yield { | ||
// when skipDecoding is not set, decode timestamp to Date and message to object | ||
localTimestamp: new Date(localTimestampBuffer.toString()), | ||
message: JSON.parse(messageBuffer as any) | ||
} as any | ||
} | ||
// ignore empty lines unless withDisconnects is set to true | ||
// do not yield subsequent undefined messages | ||
} else if (withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true | ||
yield undefined as any | ||
} | ||
// ignore empty lines unless withDisconnects is set to true | ||
// do not yield subsequent undefined messages | ||
} else if (withDisconnects === true && lastMessageWasUndefined === false) { | ||
} | ||
// if slice was empty (no lines at all) yield undefined if flag is set | ||
// do not yield subsequent undefined messages eg: two empty slices produce single undefined/disconnect message | ||
if (linesCount === 0 && withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true | ||
yield undefined as any | ||
} | ||
debug('processed slice: %s, exchange: %s, count: %d', sliceKey, exchange, linesCount) | ||
// remove slice key from the map as it's already processed | ||
cachedSlicePaths.delete(sliceKey) | ||
// move one minute forward | ||
currentSliceDate.setUTCMinutes(currentSliceDate.getUTCMinutes() + 1) | ||
} | ||
// if slice was empty (no lines at all) yield undefined if flag is set | ||
// do not yield subsequent undefined messages eg: two empty slices produce single undefined/disconnect message | ||
if (linesCount === 0 && withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true | ||
yield undefined as any | ||
} | ||
debug('processed slice: %s, exchange: %s, count: %d', sliceKey, exchange, linesCount) | ||
// remove slice key from the map as it's already processed | ||
cachedSlicePaths.delete(sliceKey) | ||
// move one minute forward | ||
currentSliceDate.setUTCMinutes(currentSliceDate.getUTCMinutes() + 1) | ||
debug( | ||
'replay for exchange: %s finished - from: %s, to: %s, filters: %o', | ||
exchange, | ||
fromDate.toISOString(), | ||
toDate.toISOString(), | ||
filters | ||
) | ||
} finally { | ||
await worker.terminate() | ||
} | ||
debug('replay for exchange: %s finished - from: %s, to: %s, filters: %o', exchange, fromDate.toISOString(), toDate.toISOString(), filters) | ||
} | ||
@@ -154,0 +164,0 @@ |
@@ -11,3 +11,4 @@ import { debug } from './debug' | ||
timeoutIntervalMS = 10000, | ||
withDisconnects = undefined | ||
withDisconnects = undefined, | ||
onError = undefined | ||
}: StreamOptions<T, U>): AsyncIterableIterator< | ||
@@ -40,9 +41,20 @@ U extends true ? { localTimestamp: Date; message: any } | undefined : { localTimestamp: Date; message: any } | ||
} catch (error) { | ||
if (onError !== undefined) { | ||
onError(error) | ||
} | ||
retries++ | ||
const MAX_DELAY = 16 * 1000 | ||
const isRateLimited = error.message.includes('429') | ||
const expontent = isRateLimited ? retries + 4 : retries - 1 | ||
let delay = Math.pow(2, expontent) * 1000 | ||
const MAX_DELAY = 32 * 1000 | ||
if (delay > MAX_DELAY) { | ||
delay = MAX_DELAY | ||
let delay | ||
if (isRateLimited) { | ||
delay = MAX_DELAY * retries | ||
} else { | ||
delay = Math.pow(2, retries - 1) * 1000 | ||
if (delay > MAX_DELAY) { | ||
delay = MAX_DELAY | ||
} | ||
} | ||
@@ -68,3 +80,3 @@ | ||
export async function* streamNormalized<T extends Exchange, U extends MapperFactory<T, any>[], Z extends boolean = false>( | ||
{ exchange, symbols, timeoutIntervalMS = 10000, withDisconnectMessages = undefined }: StreamNormalizedOptions<T, Z>, | ||
{ exchange, symbols, timeoutIntervalMS = 10000, withDisconnectMessages = undefined, onError = undefined }: StreamNormalizedOptions<T, Z>, | ||
...normalizers: U | ||
@@ -92,6 +104,8 @@ ): AsyncIterableIterator< | ||
timeoutIntervalMS, | ||
filters | ||
filters, | ||
onError | ||
}) | ||
const normalizedMessages = normalizeMessages(exchange, messages, mappers, createMappers, withDisconnectMessages) | ||
for await (const message of normalizedMessages) { | ||
@@ -101,2 +115,5 @@ yield message | ||
} catch (error) { | ||
if (onError !== undefined) { | ||
onError(error) | ||
} | ||
debug('%s normalize messages error: %o, retrying with new connection...', exchange, error) | ||
@@ -136,2 +153,3 @@ if (withDisconnectMessages) { | ||
withDisconnects?: U | ||
onError?: (error: Error) => void | ||
} | ||
@@ -144,2 +162,3 @@ | ||
withDisconnectMessages?: U | ||
onError?: (error: Error) => void | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
571016
10045