tardis-dev
Advanced tools
Comparing version 10.0.14 to 10.0.15
import { Exchange, FilterForExchange } from './types'; | ||
export declare function getExchangeDetails<T extends Exchange>(exchange: T): Promise<ExchangeDetails<T>>; | ||
export declare type ExchangeDetails<T extends Exchange> = { | ||
export declare type SymbolType = 'spot' | 'future' | 'perpetual' | 'option'; | ||
export declare type Stats = { | ||
trades: number; | ||
bookChanges: number; | ||
}; | ||
export declare type DatasetType = 'trades' | 'incremental_book_L2' | 'quotes' | 'derivative_ticker'; | ||
declare type Datasets = { | ||
dataTypes: DatasetType[]; | ||
formats: ['csv']; | ||
exportedFrom: Date; | ||
exportedUntil: Date; | ||
stats: Stats; | ||
symbols: { | ||
id: string; | ||
type: SymbolType; | ||
availableSince: string; | ||
availableTo: string; | ||
stats: Stats; | ||
}[]; | ||
}; | ||
export declare type ExchangeDetailsBase<T extends Exchange> = { | ||
id: T; | ||
name: string; | ||
filterable: boolean; | ||
enabled: boolean; | ||
filterable: boolean; | ||
availableSince: string; | ||
availableChannels: FilterForExchange[T]['channel'][]; | ||
availableSymbols: { | ||
id: string; | ||
type: 'spot' | 'future' | 'perpetual' | 'option'; | ||
type: SymbolType; | ||
availableSince: string; | ||
availableTo?: string; | ||
name?: string; | ||
}[]; | ||
availableChannels: FilterForExchange[T]['channel'][]; | ||
incidentReports: { | ||
from: string; | ||
to: string; | ||
status: string; | ||
status: 'resolved' | 'wontfix'; | ||
details: string; | ||
}; | ||
}; | ||
declare type ExchangeDetails<T extends Exchange> = (ExchangeDetailsBase<T> & { | ||
supportsDatasets: false; | ||
}) | (ExchangeDetailsBase<T> & { | ||
supportsDatasets: true; | ||
datasets: Datasets; | ||
}); | ||
export {}; | ||
//# sourceMappingURL=exchangedetails.d.ts.map |
import { Mapper } from './mappers'; | ||
import { Exchange, FilterForExchange, Filter } from './types'; | ||
import { Exchange, Filter, FilterForExchange } from './types'; | ||
export declare function parseAsUTCDate(val: string): Date; | ||
@@ -27,2 +27,8 @@ export declare function wait(delayMS: number): Promise<unknown>; | ||
export declare function optimizeFilters(filters: Filter<any>[]): Filter<any>[]; | ||
export declare function download({ apiKey, downloadPath, url, userAgent }: { | ||
url: string; | ||
downloadPath: string; | ||
userAgent: string; | ||
apiKey: string; | ||
}): Promise<void>; | ||
//# sourceMappingURL=handy.d.ts.map |
"use strict"; | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (Object.hasOwnProperty.call(mod, k)) result[k] = mod[k]; | ||
result["default"] = mod; | ||
return result; | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const crypto_1 = require("crypto"); | ||
const crypto_1 = __importStar(require("crypto")); | ||
const fs_extra_1 = require("fs-extra"); | ||
const https_1 = __importDefault(require("https")); | ||
const path_1 = __importDefault(require("path")); | ||
const debug_1 = require("./debug"); | ||
function parseAsUTCDate(val) { | ||
@@ -197,2 +211,93 @@ // not sure about this one, but it should force parsing date as UTC date not as local timezone | ||
exports.optimizeFilters = optimizeFilters; | ||
const httpsAgent = new https_1.default.Agent({ | ||
keepAlive: true, | ||
keepAliveMsecs: 10 * exports.ONE_SEC_IN_MS | ||
}); | ||
async function download({ apiKey, downloadPath, url, userAgent }) { | ||
const httpRequestOptions = { | ||
agent: httpsAgent, | ||
timeout: 45 * exports.ONE_SEC_IN_MS, | ||
headers: { | ||
'Accept-Encoding': 'gzip', | ||
'User-Agent': userAgent, | ||
Authorization: apiKey ? `Bearer ${apiKey}` : '' | ||
} | ||
}; | ||
const MAX_ATTEMPTS = 5; | ||
let attempts = 0; | ||
while (true) { | ||
// simple retry logic when fetching from the network... | ||
attempts++; | ||
try { | ||
return await _downloadFile(httpRequestOptions, url, downloadPath); | ||
} | ||
catch (error) { | ||
const badOrUnauthorizedRequest = error instanceof HttpError && (error.status === 400 || error.status === 401); | ||
const tooManyRequests = error instanceof HttpError && error.status === 429; | ||
// do not retry when we've got bad or unauthorized request or enough attempts | ||
if (badOrUnauthorizedRequest || attempts === MAX_ATTEMPTS) { | ||
throw error; | ||
} | ||
const randomIngridient = Math.random() * 500; | ||
const attemptsDelayMS = Math.pow(2, attempts) * exports.ONE_SEC_IN_MS; | ||
let nextAttemptDelayMS = randomIngridient + attemptsDelayMS; | ||
if (tooManyRequests) { | ||
// when too many requests received wait longer | ||
nextAttemptDelayMS += 3 * exports.ONE_SEC_IN_MS * attempts; | ||
} | ||
debug_1.debug('download file error: %o, next attempt delay: %d, url %s, path: %s', error, nextAttemptDelayMS, url, downloadPath); | ||
await wait(nextAttemptDelayMS); | ||
} | ||
} | ||
} | ||
exports.download = download; | ||
async function _downloadFile(requestOptions, url, downloadPath) { | ||
// first ensure that directory where we want to download file exists | ||
fs_extra_1.ensureDirSync(path_1.default.dirname(downloadPath)); | ||
// create write file stream that we'll write data into - first as unconfirmed temp file | ||
const tmpFilePath = `${downloadPath}${crypto_1.default.randomBytes(8).toString('hex')}.unconfirmed`; | ||
const fileWriteStream = fs_extra_1.createWriteStream(tmpFilePath); | ||
try { | ||
// based on https://github.com/nodejs/node/issues/28172 - only reliable way to consume response stream and avoiding all the 'gotchas' | ||
await new Promise((resolve, reject) => { | ||
const req = https_1.default | ||
.get(url, requestOptions, (res) => { | ||
const { statusCode } = res; | ||
if (statusCode !== 200) { | ||
// read the error response text and throw it as an HttpError | ||
res.setEncoding('utf8'); | ||
let body = ''; | ||
res.on('data', (chunk) => (body += chunk)); | ||
res.on('end', () => { | ||
reject(new HttpError(statusCode, body, url)); | ||
}); | ||
} | ||
else { | ||
// consume the response stream by writing it to the file | ||
res | ||
.on('error', reject) | ||
.on('aborted', () => reject(new Error('Request aborted'))) | ||
.pipe(fileWriteStream) | ||
.on('error', reject) | ||
.on('finish', resolve); | ||
} | ||
}) | ||
.on('error', reject) | ||
.on('timeout', () => { | ||
debug_1.debug('download file request timeout, %s', url); | ||
req.abort(); | ||
}); | ||
}); | ||
// finally when saving from the network to file has succeded, rename tmp file to normal name | ||
// then we're sure that responses is 100% saved and also even if different process was doing the same we're good | ||
await fs_extra_1.rename(tmpFilePath, downloadPath); | ||
} | ||
finally { | ||
fileWriteStream.destroy(); | ||
try { | ||
fs_extra_1.removeSync(tmpFilePath); | ||
} | ||
catch { } | ||
} | ||
} | ||
//# sourceMappingURL=handy.js.map |
@@ -13,3 +13,4 @@ export * from './apikeyaccessinfo'; | ||
export * from './stream'; | ||
export * from './downloaddatasets'; | ||
export * from './types'; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -19,2 +19,3 @@ "use strict"; | ||
__export(require("./stream")); | ||
__export(require("./downloaddatasets")); | ||
//# sourceMappingURL=index.js.map |
@@ -5,2 +5,3 @@ export declare function init(initOptions?: Partial<Options>): void; | ||
endpoint: string; | ||
datasetsEndpoint: string; | ||
cacheDir: string; | ||
@@ -7,0 +8,0 @@ apiKey: string; |
@@ -11,2 +11,3 @@ "use strict"; | ||
endpoint: 'https://api.tardis.dev/v1', | ||
datasetsEndpoint: 'https://datasets.tardis.dev/v1', | ||
cacheDir: path_1.default.join(os_1.default.tmpdir(), '.tardis-cache'), | ||
@@ -13,0 +14,0 @@ apiKey: '', |
@@ -6,14 +6,7 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const crypto_1 = __importDefault(require("crypto")); | ||
const debug_1 = __importDefault(require("debug")); | ||
const fs_extra_1 = require("fs-extra"); | ||
const https_1 = __importDefault(require("https")); | ||
const p_map_1 = __importDefault(require("p-map")); | ||
const path_1 = __importDefault(require("path")); | ||
const worker_threads_1 = require("worker_threads"); | ||
const handy_1 = require("./handy"); | ||
const httpsAgent = new https_1.default.Agent({ | ||
keepAlive: true, | ||
keepAliveMsecs: 10 * handy_1.ONE_SEC_IN_MS | ||
}); | ||
const debug = debug_1.default('tardis-dev'); | ||
@@ -48,9 +41,18 @@ if (worker_threads_1.isMainThread) { | ||
} | ||
async function getDataFeedSlice(payload, offset, filters, cacheDir) { | ||
const sliceTimestamp = handy_1.addMinutes(payload.fromDate, offset); | ||
async function getDataFeedSlice({ exchange, fromDate, endpoint, apiKey, userAgent }, offset, filters, cacheDir) { | ||
const sliceTimestamp = handy_1.addMinutes(fromDate, offset); | ||
const sliceKey = sliceTimestamp.toISOString(); | ||
const slicePath = `${cacheDir}/${handy_1.formatDateToPath(sliceTimestamp)}.json.gz`; | ||
const isCached = fs_extra_1.existsSync(slicePath); | ||
let url = `${endpoint}/data-feeds/${exchange}?from=${fromDate.toISOString()}&offset=${offset}`; | ||
if (filters.length > 0) { | ||
url += `&filters=${encodeURIComponent(JSON.stringify(filters))}`; | ||
} | ||
if (!isCached) { | ||
await reliablyFetchAndCacheSlice(payload, offset, filters, slicePath); | ||
await handy_1.download({ | ||
apiKey, | ||
downloadPath: slicePath, | ||
url, | ||
userAgent | ||
}); | ||
debug('getDataFeedSlice fetched from API and cached, %s', sliceKey); | ||
@@ -68,88 +70,2 @@ } | ||
} | ||
async function reliablyFetchAndCacheSlice({ exchange, fromDate, endpoint, apiKey, userAgent }, offset, filters, sliceCachePath) { | ||
let url = `${endpoint}/data-feeds/${exchange}?from=${fromDate.toISOString()}&offset=${offset}`; | ||
if (filters.length > 0) { | ||
url += `&filters=${encodeURIComponent(JSON.stringify(filters))}`; | ||
} | ||
const httpRequestOptions = { | ||
agent: httpsAgent, | ||
timeout: 45 * handy_1.ONE_SEC_IN_MS, | ||
headers: { | ||
'Accept-Encoding': 'gzip', | ||
'User-Agent': userAgent, | ||
Authorization: apiKey ? `Bearer ${apiKey}` : '' | ||
} | ||
}; | ||
const MAX_ATTEMPTS = 7; | ||
let attempts = 0; | ||
while (true) { | ||
// simple retry logic when fetching from the network... | ||
attempts++; | ||
try { | ||
return await fetchAndCacheSlice(url, httpRequestOptions, sliceCachePath); | ||
} | ||
catch (error) { | ||
const badOrUnauthorizedRequest = error instanceof handy_1.HttpError && (error.status === 400 || error.status === 401); | ||
const tooManyRequests = error instanceof handy_1.HttpError && error.status === 429; | ||
// do not retry when we've got bad or unauthorized request or enough attempts | ||
if (badOrUnauthorizedRequest || attempts === MAX_ATTEMPTS) { | ||
throw error; | ||
} | ||
const randomIngridient = Math.random() * 500; | ||
const attemptsDelayMS = Math.pow(2, attempts) * handy_1.ONE_SEC_IN_MS; | ||
let nextAttemptDelayMS = randomIngridient + attemptsDelayMS; | ||
if (tooManyRequests) { | ||
// when too many requests received wait longer | ||
nextAttemptDelayMS += 3 * handy_1.ONE_SEC_IN_MS * attempts; | ||
} | ||
debug('fetchAndCacheSlice error: %o, next attempt delay: %d, path: %s', error, nextAttemptDelayMS, sliceCachePath); | ||
await handy_1.wait(nextAttemptDelayMS); | ||
} | ||
} | ||
} | ||
async function fetchAndCacheSlice(url, options, sliceCachePath) { | ||
// first ensure that directory where we want to cache slice exists | ||
fs_extra_1.ensureDirSync(path_1.default.dirname(sliceCachePath)); | ||
// create write file stream that we'll save slice data into - first as unconfirmed temp file | ||
const tmpFilePath = `${sliceCachePath}${crypto_1.default.randomBytes(8).toString('hex')}.unconfirmed`; | ||
const fileWriteStream = fs_extra_1.createWriteStream(tmpFilePath); | ||
try { | ||
// based on https://github.com/nodejs/node/issues/28172 - only reliable way to consume response stream and avoiding all the 'gotchas' | ||
await new Promise((resolve, reject) => { | ||
const req = https_1.default | ||
.get(url, options, (res) => { | ||
const { statusCode } = res; | ||
if (statusCode !== 200) { | ||
// read the error response text and throw it as an HttpError | ||
res.setEncoding('utf8'); | ||
let body = ''; | ||
res.on('data', (chunk) => (body += chunk)); | ||
res.on('end', () => { | ||
reject(new handy_1.HttpError(statusCode, body, url)); | ||
}); | ||
} | ||
else { | ||
// consume the response stream by writing it to the file | ||
res | ||
.on('error', reject) | ||
.on('aborted', () => reject(new Error('Request aborted'))) | ||
.pipe(fileWriteStream) | ||
.on('error', reject) | ||
.on('finish', resolve); | ||
} | ||
}) | ||
.on('error', reject) | ||
.on('timeout', () => { | ||
debug('fetchAndCacheSlice request timeout'); | ||
req.abort(); | ||
}); | ||
}); | ||
} | ||
finally { | ||
fileWriteStream.destroy(); | ||
} | ||
// lastly when saving from the network to file succeded rename tmp file to normal name | ||
// so we're sure that responses is 100% saved and also even if different process was doing the same we're good | ||
await fs_extra_1.rename(tmpFilePath, sliceCachePath); | ||
} | ||
//# sourceMappingURL=worker.js.map |
{ | ||
"name": "tardis-dev", | ||
"version": "10.0.14", | ||
"version": "10.0.15", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=12" |
@@ -12,21 +12,53 @@ import got from 'got' | ||
export type ExchangeDetails<T extends Exchange> = { | ||
export type SymbolType = 'spot' | 'future' | 'perpetual' | 'option' | ||
export type Stats = { | ||
trades: number | ||
bookChanges: number | ||
} | ||
export type DatasetType = 'trades' | 'incremental_book_L2' | 'quotes' | 'derivative_ticker' | ||
type Datasets = { | ||
dataTypes: DatasetType[] | ||
formats: ['csv'] | ||
exportedFrom: Date | ||
exportedUntil: Date | ||
stats: Stats | ||
symbols: { | ||
id: string | ||
type: SymbolType | ||
availableSince: string | ||
availableTo: string | ||
stats: Stats | ||
}[] | ||
} | ||
export type ExchangeDetailsBase<T extends Exchange> = { | ||
id: T | ||
name: string | ||
filterable: boolean | ||
enabled: boolean | ||
filterable: boolean | ||
availableSince: string | ||
availableChannels: FilterForExchange[T]['channel'][] | ||
availableSymbols: { | ||
id: string | ||
type: 'spot' | 'future' | 'perpetual' | 'option' | ||
type: SymbolType | ||
availableSince: string | ||
availableTo?: string | ||
name?: string | ||
}[] | ||
availableChannels: FilterForExchange[T]['channel'][] | ||
incidentReports: { | ||
from: string | ||
to: string | ||
status: string | ||
status: 'resolved' | 'wontfix' | ||
details: string | ||
} | ||
} | ||
type ExchangeDetails<T extends Exchange> = | ||
| (ExchangeDetailsBase<T> & { supportsDatasets: false }) | ||
| (ExchangeDetailsBase<T> & { supportsDatasets: true; datasets: Datasets }) |
116
src/handy.ts
@@ -1,4 +0,8 @@ | ||
import { createHash } from 'crypto' | ||
import crypto, { createHash } from 'crypto' | ||
import { createWriteStream, ensureDirSync, rename, removeSync } from 'fs-extra' | ||
import https, { RequestOptions } from 'https' | ||
import path from 'path' | ||
import { debug } from './debug' | ||
import { Mapper } from './mappers' | ||
import { Disconnect, Exchange, FilterForExchange, Filter } from './types' | ||
import { Disconnect, Exchange, Filter, FilterForExchange } from './types' | ||
@@ -219,1 +223,109 @@ export function parseAsUTCDate(val: string) { | ||
} | ||
const httpsAgent = new https.Agent({ | ||
keepAlive: true, | ||
keepAliveMsecs: 10 * ONE_SEC_IN_MS | ||
}) | ||
export async function download({ | ||
apiKey, | ||
downloadPath, | ||
url, | ||
userAgent | ||
}: { | ||
url: string | ||
downloadPath: string | ||
userAgent: string | ||
apiKey: string | ||
}) { | ||
const httpRequestOptions = { | ||
agent: httpsAgent, | ||
timeout: 45 * ONE_SEC_IN_MS, | ||
headers: { | ||
'Accept-Encoding': 'gzip', | ||
'User-Agent': userAgent, | ||
Authorization: apiKey ? `Bearer ${apiKey}` : '' | ||
} | ||
} | ||
const MAX_ATTEMPTS = 5 | ||
let attempts = 0 | ||
while (true) { | ||
// simple retry logic when fetching from the network... | ||
attempts++ | ||
try { | ||
return await _downloadFile(httpRequestOptions, url, downloadPath) | ||
} catch (error) { | ||
const badOrUnauthorizedRequest = error instanceof HttpError && (error.status === 400 || error.status === 401) | ||
const tooManyRequests = error instanceof HttpError && error.status === 429 | ||
// do not retry when we've got bad or unauthorized request or enough attempts | ||
if (badOrUnauthorizedRequest || attempts === MAX_ATTEMPTS) { | ||
throw error | ||
} | ||
const randomIngridient = Math.random() * 500 | ||
const attemptsDelayMS = Math.pow(2, attempts) * ONE_SEC_IN_MS | ||
let nextAttemptDelayMS = randomIngridient + attemptsDelayMS | ||
if (tooManyRequests) { | ||
// when too many requests received wait longer | ||
nextAttemptDelayMS += 3 * ONE_SEC_IN_MS * attempts | ||
} | ||
debug('download file error: %o, next attempt delay: %d, url %s, path: %s', error, nextAttemptDelayMS, url, downloadPath) | ||
await wait(nextAttemptDelayMS) | ||
} | ||
} | ||
} | ||
async function _downloadFile(requestOptions: RequestOptions, url: string, downloadPath: string) { | ||
// first ensure that directory where we want to download file exists | ||
ensureDirSync(path.dirname(downloadPath)) | ||
// create write file stream that we'll write data into - first as unconfirmed temp file | ||
const tmpFilePath = `${downloadPath}${crypto.randomBytes(8).toString('hex')}.unconfirmed` | ||
const fileWriteStream = createWriteStream(tmpFilePath) | ||
try { | ||
// based on https://github.com/nodejs/node/issues/28172 - only reliable way to consume response stream and avoiding all the 'gotchas' | ||
await new Promise((resolve, reject) => { | ||
const req = https | ||
.get(url, requestOptions, (res) => { | ||
const { statusCode } = res | ||
if (statusCode !== 200) { | ||
// read the error response text and throw it as an HttpError | ||
res.setEncoding('utf8') | ||
let body = '' | ||
res.on('data', (chunk) => (body += chunk)) | ||
res.on('end', () => { | ||
reject(new HttpError(statusCode!, body, url)) | ||
}) | ||
} else { | ||
// consume the response stream by writing it to the file | ||
res | ||
.on('error', reject) | ||
.on('aborted', () => reject(new Error('Request aborted'))) | ||
.pipe(fileWriteStream) | ||
.on('error', reject) | ||
.on('finish', resolve) | ||
} | ||
}) | ||
.on('error', reject) | ||
.on('timeout', () => { | ||
debug('download file request timeout, %s', url) | ||
req.abort() | ||
}) | ||
}) | ||
// finally when saving from the network to file has succeded, rename tmp file to normal name | ||
// then we're sure that responses is 100% saved and also even if different process was doing the same we're good | ||
await rename(tmpFilePath, downloadPath) | ||
} finally { | ||
fileWriteStream.destroy() | ||
try { | ||
removeSync(tmpFilePath) | ||
} catch {} | ||
} | ||
} |
@@ -13,2 +13,3 @@ export * from './apikeyaccessinfo' | ||
export * from './stream' | ||
export * from './downloaddatasets' | ||
export * from './types' |
@@ -7,2 +7,3 @@ import os from 'os' | ||
endpoint: 'https://api.tardis.dev/v1', | ||
datasetsEndpoint: 'https://datasets.tardis.dev/v1', | ||
cacheDir: path.join(os.tmpdir(), '.tardis-cache'), | ||
@@ -25,2 +26,3 @@ apiKey: '', | ||
endpoint: string | ||
datasetsEndpoint: string | ||
cacheDir: string | ||
@@ -27,0 +29,0 @@ apiKey: string |
@@ -1,16 +0,8 @@ | ||
import crypto from 'crypto' | ||
import dbg from 'debug' | ||
import { createWriteStream, ensureDirSync, existsSync, rename } from 'fs-extra' | ||
import https, { RequestOptions } from 'https' | ||
import { existsSync } from 'fs-extra' | ||
import pMap from 'p-map' | ||
import path from 'path' | ||
import { isMainThread, parentPort, workerData } from 'worker_threads' | ||
import { addMinutes, formatDateToPath, HttpError, ONE_SEC_IN_MS, sequence, sha256, wait, optimizeFilters } from './handy' | ||
import { addMinutes, download, formatDateToPath, optimizeFilters, sequence, sha256 } from './handy' | ||
import { Exchange, Filter } from './types' | ||
const httpsAgent = new https.Agent({ | ||
keepAlive: true, | ||
keepAliveMsecs: 10 * ONE_SEC_IN_MS | ||
}) | ||
const debug = dbg('tardis-dev') | ||
@@ -55,4 +47,9 @@ | ||
async function getDataFeedSlice(payload: WorkerJobPayload, offset: number, filters: object[], cacheDir: string) { | ||
const sliceTimestamp = addMinutes(payload.fromDate, offset) | ||
async function getDataFeedSlice( | ||
{ exchange, fromDate, endpoint, apiKey, userAgent }: WorkerJobPayload, | ||
offset: number, | ||
filters: object[], | ||
cacheDir: string | ||
) { | ||
const sliceTimestamp = addMinutes(fromDate, offset) | ||
const sliceKey = sliceTimestamp.toISOString() | ||
@@ -62,4 +59,16 @@ const slicePath = `${cacheDir}/${formatDateToPath(sliceTimestamp)}.json.gz` | ||
let url = `${endpoint}/data-feeds/${exchange}?from=${fromDate.toISOString()}&offset=${offset}` | ||
if (filters.length > 0) { | ||
url += `&filters=${encodeURIComponent(JSON.stringify(filters))}` | ||
} | ||
if (!isCached) { | ||
await reliablyFetchAndCacheSlice(payload, offset, filters, slicePath) | ||
await download({ | ||
apiKey, | ||
downloadPath: slicePath, | ||
url, | ||
userAgent | ||
}) | ||
debug('getDataFeedSlice fetched from API and cached, %s', sliceKey) | ||
@@ -77,102 +86,2 @@ } else { | ||
async function reliablyFetchAndCacheSlice( | ||
{ exchange, fromDate, endpoint, apiKey, userAgent }: WorkerJobPayload, | ||
offset: number, | ||
filters: object[], | ||
sliceCachePath: string | ||
) { | ||
let url = `${endpoint}/data-feeds/${exchange}?from=${fromDate.toISOString()}&offset=${offset}` | ||
if (filters.length > 0) { | ||
url += `&filters=${encodeURIComponent(JSON.stringify(filters))}` | ||
} | ||
const httpRequestOptions = { | ||
agent: httpsAgent, | ||
timeout: 45 * ONE_SEC_IN_MS, | ||
headers: { | ||
'Accept-Encoding': 'gzip', | ||
'User-Agent': userAgent, | ||
Authorization: apiKey ? `Bearer ${apiKey}` : '' | ||
} | ||
} | ||
const MAX_ATTEMPTS = 7 | ||
let attempts = 0 | ||
while (true) { | ||
// simple retry logic when fetching from the network... | ||
attempts++ | ||
try { | ||
return await fetchAndCacheSlice(url, httpRequestOptions, sliceCachePath) | ||
} catch (error) { | ||
const badOrUnauthorizedRequest = error instanceof HttpError && (error.status === 400 || error.status === 401) | ||
const tooManyRequests = error instanceof HttpError && error.status === 429 | ||
// do not retry when we've got bad or unauthorized request or enough attempts | ||
if (badOrUnauthorizedRequest || attempts === MAX_ATTEMPTS) { | ||
throw error | ||
} | ||
const randomIngridient = Math.random() * 500 | ||
const attemptsDelayMS = Math.pow(2, attempts) * ONE_SEC_IN_MS | ||
let nextAttemptDelayMS = randomIngridient + attemptsDelayMS | ||
if (tooManyRequests) { | ||
// when too many requests received wait longer | ||
nextAttemptDelayMS += 3 * ONE_SEC_IN_MS * attempts | ||
} | ||
debug('fetchAndCacheSlice error: %o, next attempt delay: %d, path: %s', error, nextAttemptDelayMS, sliceCachePath) | ||
await wait(nextAttemptDelayMS) | ||
} | ||
} | ||
} | ||
async function fetchAndCacheSlice(url: string, options: RequestOptions, sliceCachePath: string) { | ||
// first ensure that directory where we want to cache slice exists | ||
ensureDirSync(path.dirname(sliceCachePath)) | ||
// create write file stream that we'll save slice data into - first as unconfirmed temp file | ||
const tmpFilePath = `${sliceCachePath}${crypto.randomBytes(8).toString('hex')}.unconfirmed` | ||
const fileWriteStream = createWriteStream(tmpFilePath) | ||
try { | ||
// based on https://github.com/nodejs/node/issues/28172 - only reliable way to consume response stream and avoiding all the 'gotchas' | ||
await new Promise((resolve, reject) => { | ||
const req = https | ||
.get(url, options, (res) => { | ||
const { statusCode } = res | ||
if (statusCode !== 200) { | ||
// read the error response text and throw it as an HttpError | ||
res.setEncoding('utf8') | ||
let body = '' | ||
res.on('data', (chunk) => (body += chunk)) | ||
res.on('end', () => { | ||
reject(new HttpError(statusCode!, body, url)) | ||
}) | ||
} else { | ||
// consume the response stream by writing it to the file | ||
res | ||
.on('error', reject) | ||
.on('aborted', () => reject(new Error('Request aborted'))) | ||
.pipe(fileWriteStream) | ||
.on('error', reject) | ||
.on('finish', resolve) | ||
} | ||
}) | ||
.on('error', reject) | ||
.on('timeout', () => { | ||
debug('fetchAndCacheSlice request timeout') | ||
req.abort() | ||
}) | ||
}) | ||
} finally { | ||
fileWriteStream.destroy() | ||
} | ||
// lastly when saving from the network to file succeded rename tmp file to normal name | ||
// so we're sure that responses is 100% saved and also even if different process was doing the same we're good | ||
await rename(tmpFilePath, sliceCachePath) | ||
} | ||
export type WorkerMessage = { | ||
@@ -179,0 +88,0 @@ sliceKey: string |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
683945
294
12102
5