tardis-machine
Advanced tools
Comparing version 2.0.0-alpha.0 to 2.0.0-beta.0
#!/usr/bin/env node | ||
const yargs = require('yargs') | ||
const { TardisClient } = require('tardis-client') | ||
const os = require('os') | ||
const path = require('path') | ||
const { TardisMachine } = require('../dist') | ||
@@ -20,3 +22,3 @@ | ||
describe: 'Local cache dir path ', | ||
default: TardisClient._defaultOptions.cacheDir | ||
default: path.join(os.tmpdir(), '.tardis-cache') | ||
}) | ||
@@ -38,3 +40,3 @@ .option('clear-cache', { | ||
.example('$0 --api-key=YOUR_API_KEY') | ||
.epilogue('Check out https://tardis.dev for more information.') | ||
.epilogue('See https://docs.tardis.dev/api/tardis-machine for more information.') | ||
.detectLocale(false).argv | ||
@@ -41,0 +43,0 @@ |
export declare class TardisMachine { | ||
private readonly options; | ||
private readonly _tardisClient; | ||
private readonly _httpServer; | ||
private readonly _replaySessionsMeta; | ||
constructor(options: Options); | ||
private _initWebSocketServer; | ||
run(port: number): Promise<void>; | ||
stop(): Promise<void>; | ||
private _addToOrCreateNewReplaySession; | ||
private _writeDataFeedMessagesToResponse; | ||
private _setUpWebsocketClientsRelatedRoutesRoutes; | ||
} | ||
@@ -13,0 +9,0 @@ declare type Options = { |
@@ -6,17 +6,14 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const find_my_way_1 = __importDefault(require("find-my-way")); | ||
const http_1 = __importDefault(require("http")); | ||
const events_1 = require("events"); | ||
const is_docker_1 = __importDefault(require("is-docker")); | ||
const tardis_dev_1 = require("tardis-dev"); | ||
const url_1 = __importDefault(require("url")); | ||
const debug_1 = __importDefault(require("debug")); | ||
const find_my_way_1 = __importDefault(require("find-my-way")); | ||
const is_docker_1 = __importDefault(require("is-docker")); | ||
const ws_1 = __importDefault(require("ws")); | ||
const tardis_client_1 = require("tardis-client"); | ||
const replaysession_1 = require("./replaysession"); | ||
const debug = debug_1.default('tardis-machine'); | ||
const http_2 = require("./http"); | ||
const ws_2 = require("./ws"); | ||
class TardisMachine { | ||
constructor(options) { | ||
this.options = options; | ||
this._replaySessionsMeta = []; | ||
this._tardisClient = new tardis_client_1.TardisClient({ | ||
tardis_dev_1.init({ | ||
apiKey: options.apiKey, | ||
@@ -29,105 +26,34 @@ cacheDir: options.cacheDir | ||
}); | ||
// setup /replay streaming http route | ||
router.on('GET', '/replay', async (req, res) => { | ||
try { | ||
const startTimestamp = new Date().getTime(); | ||
const parsedQuery = url_1.default.parse(req.url, true).query; | ||
const filtersQuery = parsedQuery['filters']; | ||
const replayOptions = { | ||
exchange: parsedQuery['exchange'], | ||
from: parsedQuery['from'], | ||
to: parsedQuery['to'], | ||
filters: filtersQuery ? JSON.parse(filtersQuery) : undefined | ||
}; | ||
debug('GET /replay request started, options: %o', replayOptions); | ||
const streamedMessagesCount = await this._writeDataFeedMessagesToResponse(res, replayOptions); | ||
const endTimestamp = new Date().getTime(); | ||
debug('GET /replay request finished, options: %o, time: %d seconds, total messages count:%d', replayOptions, (endTimestamp - startTimestamp) / 1000, streamedMessagesCount); | ||
} | ||
catch (e) { | ||
const errorInfo = { | ||
responseText: e.responseText, | ||
message: e.message, | ||
url: e.url | ||
}; | ||
debug('GET /replay request error: %o', e); | ||
console.error('GET /replay request error:', e); | ||
if (!res.finished) { | ||
res.statusCode = e.status || 500; | ||
res.end(JSON.stringify(errorInfo)); | ||
} | ||
} | ||
}); | ||
router.on('GET', '/replay-normalized', async (req, res) => { | ||
const parsedQuery = url_1.default.parse(req.url, true).query; | ||
const dataTypeFromQuery = parsedQuery['datatype']; | ||
const provideSnapshots = dataTypeFromQuery === 'book_snapshot.1m'; | ||
const normalizedMessages = this._tardisClient.replayNormalized({ | ||
exchange: parsedQuery['exchange'], | ||
dataTypes: provideSnapshots ? 'book_change' : dataTypeFromQuery, | ||
from: parsedQuery['from'], | ||
to: parsedQuery['to'], | ||
symbols: [parsedQuery['symbol']] | ||
}); | ||
res.setHeader('Content-Type', 'application/json'); | ||
const orderBook = new tardis_client_1.OrderBook(); | ||
let lastMessageTimestamp; | ||
for await (const message of normalizedMessages) { | ||
if (provideSnapshots) { | ||
if (!lastMessageTimestamp) { | ||
lastMessageTimestamp = message.localTimestamp; | ||
} | ||
if (message.localTimestamp.getUTCMinutes() !== lastMessageTimestamp.getUTCMinutes() || | ||
message.localTimestamp.getUTCHours() !== lastMessageTimestamp.getUTCHours()) { | ||
const timestamp = new Date(lastMessageTimestamp); | ||
timestamp.setUTCSeconds(0, 0); | ||
const snapshot = { | ||
type: 'book_snapshot.1m', | ||
symbol: message.symbol, | ||
timestamp, | ||
timestampReal: lastMessageTimestamp, | ||
asks: Array.from(orderBook.asks()), | ||
bids: Array.from(orderBook.bids()) | ||
}; | ||
const ok = res.write(`${JSON.stringify(snapshot)}\n`); | ||
// let's handle backpressure - https://nodejs.org/api/http.html#http_request_write_chunk_encoding_callback | ||
if (!ok) { | ||
await events_1.once(res, 'drain'); | ||
} | ||
} | ||
orderBook.update(message); | ||
lastMessageTimestamp = message.localTimestamp; | ||
} | ||
else { | ||
const ok = res.write(`${JSON.stringify(message)}\n`); | ||
// let's handle backpressure - https://nodejs.org/api/http.html#http_request_write_chunk_encoding_callback | ||
if (!ok) { | ||
await events_1.once(res, 'drain'); | ||
} | ||
} | ||
} | ||
console.log('end'); | ||
res.end(''); | ||
}); | ||
// set timeout to 0 meaning infinite http timout - streaming may take some time expecially for longer date ranges | ||
this._httpServer.timeout = 0; | ||
const websocketServer = new ws_1.default.Server({ noServer: true }); | ||
this._httpServer.on('upgrade', function upgrade(request, socket, head) { | ||
const pathname = url_1.default.parse(request.url).pathname; | ||
// setup websocket server for /ws-replay path | ||
if (pathname === '/ws-replay') { | ||
websocketServer.handleUpgrade(request, socket, head, function done(ws) { | ||
websocketServer.emit('connection', ws, request); | ||
}); | ||
router.on('GET', '/replay', http_2.replayHttp); | ||
router.on('GET', '/replay-normalized', http_2.replayNormalizedHttp); | ||
router.on('GET', '/api/v1/schema/websocketHelp', http_2.bitmexWsHelp); | ||
this._initWebSocketServer(); | ||
} | ||
_initWebSocketServer() { | ||
const websocketServer = new ws_1.default.Server({ server: this._httpServer }); | ||
// super simple routing for websocket routes | ||
const routes = { | ||
'/ws-replay': ws_2.replayWS, | ||
'/ws-replay-normalized': ws_2.replayNormalizedWS, | ||
'/ws-stream-normalized': ws_2.streamNormalizedWS | ||
}; | ||
websocketServer.on('connection', async (ws, request) => { | ||
const path = url_1.default | ||
.parse(request.url) | ||
.pathname.replace(/\/$/, '') | ||
.toLocaleLowerCase(); | ||
const matchingRoute = routes[path]; | ||
if (matchingRoute !== undefined) { | ||
matchingRoute(ws, request); | ||
} | ||
else { | ||
socket.destroy(); | ||
ws.close(1008); | ||
} | ||
}); | ||
websocketServer.on('connection', this._addToOrCreateNewReplaySession.bind(this)); | ||
this._setUpWebsocketClientsRelatedRoutesRoutes(router); | ||
} | ||
async run(port) { | ||
if (this.options.clearCache) { | ||
await this._tardisClient.clearCache(); | ||
await tardis_dev_1.clearCache(); | ||
} | ||
@@ -144,8 +70,8 @@ await new Promise((resolve, reject) => { | ||
if (is_docker_1.default() && !process.env.RUNKIT_HOST) { | ||
console.log(`TardisMachine is running inside Docker container...`); | ||
console.log(`tardis-machine is running inside Docker container...`); | ||
} | ||
else { | ||
console.log(`TardisMachine is running on ${port} port...`); | ||
console.log(`tardis-machine is running on ${port} port...`); | ||
} | ||
console.log(`Check out https://docs.tardis.dev for help or more information.`); | ||
console.log(`See https://docs.tardis.dev/api/tardis-machine for more information.`); | ||
} | ||
@@ -159,104 +85,4 @@ async stop() { | ||
} | ||
async _addToOrCreateNewReplaySession(ws, upgReq) { | ||
const parsedQuery = url_1.default.parse(upgReq.url, true).query; | ||
const from = parsedQuery['from']; | ||
const to = parsedQuery['to']; | ||
const replaySessionKey = `${from}-${to}`; | ||
// if there are multiple separate ws connections being made for the same date ranges | ||
// in short time frame (5 seconds) | ||
// consolidate them in single replay session that will make sure that messages being send via websockets connections | ||
// are be synchronized by local timestamp | ||
const matchingReplaySessionMeta = this._replaySessionsMeta.find(s => s.sessionKey == replaySessionKey && s.session.hasStarted == false); | ||
if (matchingReplaySessionMeta) { | ||
matchingReplaySessionMeta.session.addToSession(new replaysession_1.WebsocketConnection(ws, this._tardisClient, parsedQuery)); | ||
} | ||
else { | ||
const newReplaySession = new replaysession_1.ReplaySession(); | ||
const meta = { | ||
sessionKey: replaySessionKey, | ||
session: newReplaySession | ||
}; | ||
this._replaySessionsMeta.push(meta); | ||
newReplaySession.addToSession(new replaysession_1.WebsocketConnection(ws, this._tardisClient, parsedQuery)); | ||
newReplaySession.onClose(() => { | ||
const toRemove = this._replaySessionsMeta.indexOf(meta); | ||
this._replaySessionsMeta.splice(toRemove, 1); | ||
}); | ||
} | ||
} | ||
async _writeDataFeedMessagesToResponse(res, { exchange, from, to, filters }) { | ||
const responsePrefixBuffer = Buffer.from('{"localTimestamp":"'); | ||
const responseMiddleBuffer = Buffer.from('","message":'); | ||
const responseSuffixBuffer = Buffer.from('}\n'); | ||
const BATCH_SIZE = 32; | ||
// not 100% sure that's necessary since we're returning ndjson in fact, not json | ||
res.setHeader('Content-Type', 'application/json'); | ||
let buffers = []; | ||
let totalMessagesCount = 0; | ||
const dataFeedMessages = this._tardisClient.replay({ exchange, from, to, filters, skipDecoding: true }); | ||
for await (let { message, localTimestamp } of dataFeedMessages) { | ||
totalMessagesCount++; | ||
// instead of writing each message directly to response, | ||
// let's batch them and send in BATCH_SIZE batches (each message is 5 buffers: prefix etc) | ||
// also instead of converting messages to string or parsing them let's manually stich together desired json response using buffers which is faster | ||
buffers.push(...[responsePrefixBuffer, localTimestamp, responseMiddleBuffer, message, responseSuffixBuffer]); | ||
if (buffers.length == BATCH_SIZE * 5) { | ||
const ok = res.write(Buffer.concat(buffers)); | ||
buffers = []; | ||
// let's handle backpressure - https://nodejs.org/api/http.html#http_request_write_chunk_encoding_callback | ||
if (!ok) { | ||
await events_1.once(res, 'drain'); | ||
} | ||
} | ||
} | ||
// write remaining buffers to response | ||
if (buffers.length > 0) { | ||
res.write(Buffer.concat(buffers)); | ||
buffers = []; | ||
} | ||
res.end(''); | ||
return totalMessagesCount; | ||
} | ||
_setUpWebsocketClientsRelatedRoutesRoutes(router) { | ||
// based on https://www.bitmex.com/api/v1/schema/websocketHelp | ||
const bitmexWSHelpResponse = JSON.stringify({ | ||
info: 'See https://www.bitmex.com/app/wsAPI and https://www.bitmex.com/explorer for more documentation.', | ||
usage: 'Send a message in the format: {"op": string, "args": Array<string>}', | ||
ops: ['authKey', 'authKeyExpires', 'cancelAllAfter', 'subscribe', 'unsubscribe'], | ||
subscribe: 'To subscribe, send: {"op": "subscribe", "args": [subscriptionTopic, ...]}.', | ||
subscriptionSubjects: { | ||
authenticationRequired: [], | ||
public: [ | ||
'announcement', | ||
'connected', | ||
'chat', | ||
'publicNotifications', | ||
'instrument', | ||
'settlement', | ||
'funding', | ||
'insurance', | ||
'liquidation', | ||
'orderBookL2', | ||
'orderBookL2_25', | ||
'orderBook10', | ||
'quote', | ||
'trade', | ||
'quoteBin1m', | ||
'quoteBin5m', | ||
'quoteBin1h', | ||
'quoteBin1d', | ||
'tradeBin1m', | ||
'tradeBin5m', | ||
'tradeBin1h', | ||
'tradeBin1d' | ||
] | ||
} | ||
}); | ||
router.on('GET', '/api/v1/schema/websocketHelp', async (_, res) => { | ||
res.setHeader('Content-Type', 'application/json'); | ||
res.end(bitmexWSHelpResponse); | ||
}); | ||
} | ||
} | ||
exports.TardisMachine = TardisMachine; | ||
//# sourceMappingURL=tardismachine.js.map |
{ | ||
"name": "tardis-machine", | ||
"version": "2.0.0-alpha.0", | ||
"version": "2.0.0-beta.0", | ||
"engines": { | ||
"node": ">=12" | ||
}, | ||
"description": "https://tardis.dev API client with built-in local caching providing on-demand tick-level market data replay from any point in time in exchange's Websocket data format.", | ||
"description": "Fast locally installable server with built-in local caching, available via NPM & Docker, providing on-demand tick-level cryptocurrency real-time streaming and market data replay from any moment in time in exchange's WebSocket data format that uses tardis.dev HTTP API under the hood.", | ||
"main": "dist/index.js", | ||
@@ -16,5 +16,5 @@ "source": "src/index.js", | ||
"precommit": "lint-staged", | ||
"test": "jest", | ||
"test": "jest --runInBand --forceExit", | ||
"prepare": "npm run build", | ||
"release": "cross-var \"npm run test && npm run build && git commit -am $npm_package_version && git tag $npm_package_version && git push && git push --tags && npm publish --access=public\"" | ||
"release": "cross-var \"npm run build && git commit -am $npm_package_version && git tag $npm_package_version && git push && git push --tags && npm publish --access=public\"" | ||
}, | ||
@@ -45,27 +45,27 @@ "bin": { | ||
"dependencies": { | ||
"@types/ws": "^6.0.1", | ||
"@types/ws": "^6.0.3", | ||
"debug": "^4.1.1", | ||
"find-my-way": "^2.0.1", | ||
"find-my-way": "^2.2.1", | ||
"is-docker": "^2.0.0", | ||
"tardis-client": "^2.0.0-alpha.0", | ||
"yargs": "^13.2.4", | ||
"ws": "^7.0.0" | ||
"tardis-dev": "^7.7.5", | ||
"ws": "^7.2.0", | ||
"yargs": "^14.2.0" | ||
}, | ||
"devDependencies": { | ||
"@types/debug": "^4.1.4", | ||
"@types/jest": "^24.0.13", | ||
"@types/node": "^12.0.3", | ||
"@types/node-fetch": "^2.3.5", | ||
"@types/debug": "^4.1.5", | ||
"@types/jest": "^24.0.23", | ||
"@types/node": "^12.12.7", | ||
"@types/node-fetch": "^2.5.3", | ||
"@types/split2": "^2.1.6", | ||
"@types/yargs": "^13.0.0", | ||
"@types/yargs": "^13.0.3", | ||
"bitmex-realtime-api": "^0.4.0", | ||
"cross-var": "^1.1.0", | ||
"husky": "^1.2.0", | ||
"jest": "^24.7.1", | ||
"lint-staged": "^8.1.0", | ||
"husky": "^3.0.9", | ||
"jest": "^24.9.0", | ||
"lint-staged": "^9.4.3", | ||
"node-fetch": "^2.6.0", | ||
"prettier": "^1.17.0", | ||
"prettier": "^1.19.1", | ||
"split2": "^3.1.1", | ||
"ts-jest": "^24.0.2", | ||
"typescript": "^3.5.1" | ||
"ts-jest": "^24.1.0", | ||
"typescript": "^3.7.2" | ||
}, | ||
@@ -72,0 +72,0 @@ "lint-staged": { |
@@ -1,20 +0,15 @@ | ||
import http, { OutgoingMessage, IncomingMessage } from 'http' | ||
import { once } from 'events' | ||
import url from 'url' | ||
import dbg from 'debug' | ||
import findMyWay from 'find-my-way' | ||
import http from 'http' | ||
import isDocker from 'is-docker' | ||
import { clearCache, init } from 'tardis-dev' | ||
import url from 'url' | ||
import WebSocket from 'ws' | ||
import { TardisClient, OrderBook } from 'tardis-client' | ||
import { ReplaySession, WebsocketConnection } from './replaysession' | ||
import { bitmexWsHelp, replayHttp, replayNormalizedHttp } from './http' | ||
import { replayNormalizedWS, replayWS, streamNormalizedWS } from './ws' | ||
const debug = dbg('tardis-machine') | ||
export class TardisMachine { | ||
private readonly _tardisClient: TardisClient | ||
private readonly _httpServer: http.Server | ||
private readonly _replaySessionsMeta: { sessionKey: string; session: ReplaySession }[] = [] | ||
constructor(private readonly options: Options) { | ||
this._tardisClient = new TardisClient({ | ||
init({ | ||
apiKey: options.apiKey, | ||
@@ -25,2 +20,3 @@ cacheDir: options.cacheDir | ||
const router = findMyWay({ ignoreTrailingSlash: true }) | ||
this._httpServer = http.createServer((req, res) => { | ||
@@ -30,125 +26,36 @@ router.lookup(req, res) | ||
// setup /replay streaming http route | ||
router.on('GET', '/replay', async (req, res) => { | ||
try { | ||
const startTimestamp = new Date().getTime() | ||
const parsedQuery = url.parse(req.url!, true).query | ||
const filtersQuery = parsedQuery['filters'] as string | ||
// set timeout to 0 meaning infinite http timout - streaming may take some time expecially for longer date ranges | ||
this._httpServer.timeout = 0 | ||
const replayOptions = { | ||
exchange: parsedQuery['exchange'] as string, | ||
from: parsedQuery['from'] as string, | ||
to: parsedQuery['to'] as string, | ||
filters: filtersQuery ? JSON.parse(filtersQuery) : undefined | ||
} | ||
router.on('GET', '/replay', replayHttp) | ||
router.on('GET', '/replay-normalized', replayNormalizedHttp) | ||
router.on('GET', '/api/v1/schema/websocketHelp', bitmexWsHelp) | ||
debug('GET /replay request started, options: %o', replayOptions) | ||
this._initWebSocketServer() | ||
} | ||
const streamedMessagesCount = await this._writeDataFeedMessagesToResponse(res, replayOptions) | ||
const endTimestamp = new Date().getTime() | ||
private _initWebSocketServer() { | ||
const websocketServer = new WebSocket.Server({ server: this._httpServer }) | ||
debug( | ||
'GET /replay request finished, options: %o, time: %d seconds, total messages count:%d', | ||
replayOptions, | ||
(endTimestamp - startTimestamp) / 1000, | ||
streamedMessagesCount | ||
) | ||
} catch (e) { | ||
const errorInfo = { | ||
responseText: e.responseText, | ||
message: e.message, | ||
url: e.url | ||
} | ||
// super simple routing for websocket routes | ||
const routes = { | ||
'/ws-replay': replayWS, | ||
'/ws-replay-normalized': replayNormalizedWS, | ||
'/ws-stream-normalized': streamNormalizedWS | ||
} as any | ||
debug('GET /replay request error: %o', e) | ||
console.error('GET /replay request error:', e) | ||
websocketServer.on('connection', async (ws, request) => { | ||
const path = url | ||
.parse(request.url!) | ||
.pathname!.replace(/\/$/, '') | ||
.toLocaleLowerCase() | ||
if (!res.finished) { | ||
res.statusCode = e.status || 500 | ||
res.end(JSON.stringify(errorInfo)) | ||
} | ||
} | ||
}) | ||
const matchingRoute = routes[path] | ||
router.on('GET', '/replay-normalized', async (req, res) => { | ||
const parsedQuery = url.parse(req.url!, true).query | ||
const dataTypeFromQuery = parsedQuery['datatype'] as any | ||
const provideSnapshots = dataTypeFromQuery === 'book_snapshot.1m' | ||
const normalizedMessages = this._tardisClient.replayNormalized({ | ||
exchange: parsedQuery['exchange'] as any, | ||
dataTypes: provideSnapshots ? 'book_change' : dataTypeFromQuery, | ||
from: parsedQuery['from'] as string, | ||
to: parsedQuery['to'] as string, | ||
symbols: [parsedQuery['symbol'] as any] | ||
}) | ||
res.setHeader('Content-Type', 'application/json') | ||
const orderBook = new OrderBook() | ||
let lastMessageTimestamp: Date | undefined | ||
for await (const message of normalizedMessages) { | ||
if (provideSnapshots) { | ||
if (!lastMessageTimestamp) { | ||
lastMessageTimestamp = message.localTimestamp | ||
} | ||
if ( | ||
(message.localTimestamp as Date).getUTCMinutes() !== lastMessageTimestamp!.getUTCMinutes() || | ||
(message.localTimestamp as Date).getUTCHours() !== lastMessageTimestamp!.getUTCHours() | ||
) { | ||
const timestamp = new Date(lastMessageTimestamp!) | ||
timestamp.setUTCSeconds(0, 0) | ||
const snapshot = { | ||
type: 'book_snapshot.1m', | ||
symbol: message.symbol, | ||
timestamp, | ||
timestampReal: lastMessageTimestamp, | ||
asks: Array.from(orderBook.asks()), | ||
bids: Array.from(orderBook.bids()) | ||
} | ||
const ok = res.write(`${JSON.stringify(snapshot)}\n`) | ||
// let's handle backpressure - https://nodejs.org/api/http.html#http_request_write_chunk_encoding_callback | ||
if (!ok) { | ||
await once(res, 'drain') | ||
} | ||
} | ||
orderBook.update(message) | ||
lastMessageTimestamp = message.localTimestamp | ||
} else { | ||
const ok = res.write(`${JSON.stringify(message)}\n`) | ||
// let's handle backpressure - https://nodejs.org/api/http.html#http_request_write_chunk_encoding_callback | ||
if (!ok) { | ||
await once(res, 'drain') | ||
} | ||
} | ||
} | ||
console.log('end') | ||
res.end('') | ||
}) | ||
// set timeout to 0 meaning infinite http timout - streaming may take some time expecially for longer date ranges | ||
this._httpServer.timeout = 0 | ||
const websocketServer = new WebSocket.Server({ noServer: true }) | ||
this._httpServer.on('upgrade', function upgrade(request, socket, head) { | ||
const pathname = url.parse(request.url).pathname | ||
// setup websocket server for /ws-replay path | ||
if (pathname === '/ws-replay') { | ||
websocketServer.handleUpgrade(request, socket, head, function done(ws) { | ||
websocketServer.emit('connection', ws, request) | ||
}) | ||
if (matchingRoute !== undefined) { | ||
matchingRoute(ws, request) | ||
} else { | ||
socket.destroy() | ||
ws.close(1008) | ||
} | ||
}) | ||
websocketServer.on('connection', this._addToOrCreateNewReplaySession.bind(this)) | ||
this._setUpWebsocketClientsRelatedRoutesRoutes(router) | ||
} | ||
@@ -158,3 +65,3 @@ | ||
if (this.options.clearCache) { | ||
await this._tardisClient.clearCache() | ||
await clearCache() | ||
} | ||
@@ -172,8 +79,8 @@ | ||
if (isDocker() && !process.env.RUNKIT_HOST) { | ||
console.log(`TardisMachine is running inside Docker container...`) | ||
console.log(`tardis-machine is running inside Docker container...`) | ||
} else { | ||
console.log(`TardisMachine is running on ${port} port...`) | ||
console.log(`tardis-machine is running on ${port} port...`) | ||
} | ||
console.log(`Check out https://docs.tardis.dev for help or more information.`) | ||
console.log(`See https://docs.tardis.dev/api/tardis-machine for more information.`) | ||
} | ||
@@ -188,121 +95,2 @@ | ||
} | ||
private async _addToOrCreateNewReplaySession(ws: WebSocket, upgReq: IncomingMessage) { | ||
const parsedQuery = url.parse(upgReq.url!, true).query | ||
const from = parsedQuery['from'] as string | ||
const to = parsedQuery['to'] as string | ||
const replaySessionKey = `${from}-${to}` | ||
// if there are multiple separate ws connections being made for the same date ranges | ||
// in short time frame (5 seconds) | ||
// consolidate them in single replay session that will make sure that messages being send via websockets connections | ||
// are be synchronized by local timestamp | ||
const matchingReplaySessionMeta = this._replaySessionsMeta.find(s => s.sessionKey == replaySessionKey && s.session.hasStarted == false) | ||
if (matchingReplaySessionMeta) { | ||
matchingReplaySessionMeta.session.addToSession(new WebsocketConnection(ws, this._tardisClient, parsedQuery)) | ||
} else { | ||
const newReplaySession = new ReplaySession() | ||
const meta = { | ||
sessionKey: replaySessionKey, | ||
session: newReplaySession | ||
} | ||
this._replaySessionsMeta.push(meta) | ||
newReplaySession.addToSession(new WebsocketConnection(ws, this._tardisClient, parsedQuery)) | ||
newReplaySession.onClose(() => { | ||
const toRemove = this._replaySessionsMeta.indexOf(meta) | ||
this._replaySessionsMeta.splice(toRemove, 1) | ||
}) | ||
} | ||
} | ||
private async _writeDataFeedMessagesToResponse( | ||
res: OutgoingMessage, | ||
{ exchange, from, to, filters }: { exchange: any; from: string; to: string; filters: any } | ||
) { | ||
const responsePrefixBuffer = Buffer.from('{"localTimestamp":"') | ||
const responseMiddleBuffer = Buffer.from('","message":') | ||
const responseSuffixBuffer = Buffer.from('}\n') | ||
const BATCH_SIZE = 32 | ||
// not 100% sure that's necessary since we're returning ndjson in fact, not json | ||
res.setHeader('Content-Type', 'application/json') | ||
let buffers: Buffer[] = [] | ||
let totalMessagesCount = 0 | ||
const dataFeedMessages = this._tardisClient.replay({ exchange, from, to, filters, skipDecoding: true }) | ||
for await (let { message, localTimestamp } of dataFeedMessages) { | ||
totalMessagesCount++ | ||
// instead of writing each message directly to response, | ||
// let's batch them and send in BATCH_SIZE batches (each message is 5 buffers: prefix etc) | ||
// also instead of converting messages to string or parsing them let's manually stich together desired json response using buffers which is faster | ||
buffers.push(...[responsePrefixBuffer, localTimestamp, responseMiddleBuffer, message, responseSuffixBuffer]) | ||
if (buffers.length == BATCH_SIZE * 5) { | ||
const ok = res.write(Buffer.concat(buffers)) | ||
buffers = [] | ||
// let's handle backpressure - https://nodejs.org/api/http.html#http_request_write_chunk_encoding_callback | ||
if (!ok) { | ||
await once(res, 'drain') | ||
} | ||
} | ||
} | ||
// write remaining buffers to response | ||
if (buffers.length > 0) { | ||
res.write(Buffer.concat(buffers)) | ||
buffers = [] | ||
} | ||
res.end('') | ||
return totalMessagesCount | ||
} | ||
private _setUpWebsocketClientsRelatedRoutesRoutes(router: findMyWay.Instance<findMyWay.HTTPVersion.V1>) { | ||
// based on https://www.bitmex.com/api/v1/schema/websocketHelp | ||
const bitmexWSHelpResponse = JSON.stringify({ | ||
info: 'See https://www.bitmex.com/app/wsAPI and https://www.bitmex.com/explorer for more documentation.', | ||
usage: 'Send a message in the format: {"op": string, "args": Array<string>}', | ||
ops: ['authKey', 'authKeyExpires', 'cancelAllAfter', 'subscribe', 'unsubscribe'], | ||
subscribe: 'To subscribe, send: {"op": "subscribe", "args": [subscriptionTopic, ...]}.', | ||
subscriptionSubjects: { | ||
authenticationRequired: [], | ||
public: [ | ||
'announcement', | ||
'connected', | ||
'chat', | ||
'publicNotifications', | ||
'instrument', | ||
'settlement', | ||
'funding', | ||
'insurance', | ||
'liquidation', | ||
'orderBookL2', | ||
'orderBookL2_25', | ||
'orderBook10', | ||
'quote', | ||
'trade', | ||
'quoteBin1m', | ||
'quoteBin5m', | ||
'quoteBin1h', | ||
'quoteBin1d', | ||
'tradeBin1m', | ||
'tradeBin5m', | ||
'tradeBin1h', | ||
'tradeBin1d' | ||
] | ||
} | ||
}) | ||
router.on('GET', '/api/v1/schema/websocketHelp', async (_, res) => { | ||
res.setHeader('Content-Type', 'application/json') | ||
res.end(bitmexWSHelpResponse) | ||
}) | ||
} | ||
} | ||
@@ -309,0 +97,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
151377
70
2193
2
+ Addedtardis-dev@^7.7.5
+ Added@types/bintrees@1.0.6(transitive)
+ Addedbintrees@1.0.2(transitive)
+ Addedbufferutil@4.0.8(transitive)
+ Addednode-gyp-build@4.8.4(transitive)
+ Addedtardis-dev@7.7.10(transitive)
+ Addedyargs@14.2.3(transitive)
+ Addedyargs-parser@15.0.3(transitive)
- Removedtardis-client@^2.0.0-alpha.0
- Removedcollections@2.0.3(transitive)
- Removedmini-map@1.0.0(transitive)
- Removedpop-arrayify@1.0.0(transitive)
- Removedpop-clear@1.0.0(transitive)
- Removedpop-clone@1.0.1(transitive)
- Removedpop-compare@1.0.0(transitive)
- Removedpop-equals@1.0.0(transitive)
- Removedpop-has@1.0.0(transitive)
- Removedpop-hash@1.0.1(transitive)
- Removedpop-iterate@1.0.1(transitive)
- Removedpop-observe@2.0.2(transitive)
- Removedpop-swap@1.0.0(transitive)
- Removedpop-zip@1.0.0(transitive)
- Removedregexp-escape@0.0.1(transitive)
- Removedtardis-client@2.0.0-alpha.0(transitive)
- Removedweak-map@1.0.8(transitive)
- Removedyargs@13.3.2(transitive)
- Removedyargs-parser@13.1.2(transitive)
Updated@types/ws@^6.0.3
Updatedfind-my-way@^2.2.1
Updatedws@^7.2.0
Updatedyargs@^14.2.0