tardis-node
Advanced tools
Comparing version 5.0.0 to 6.0.0
@@ -19,10 +19,11 @@ "use strict"; | ||
for (const computable of computables) { | ||
// any time new message arrives check if given computable has | ||
// new sample for such message timestamp, eg: time based trade bars | ||
if (computable.hasNewSample(timestamp)) { | ||
yield computable.getSample(localTimestamp); | ||
} | ||
// update computable with new data if data types match | ||
// and check if such computable after update has new sample as well | ||
// any time new message arrives check if given computable | ||
// source data types include message type and | ||
// has new sample for such message timestamp, eg: time based trade bars | ||
if (computable.sourceDataTypes.includes(message.type)) { | ||
if (computable.hasNewSample(timestamp)) { | ||
yield computable.getSample(localTimestamp); | ||
} | ||
// update computable with new data | ||
// and check if such computable after update has new sample as well | ||
computable.update(message); | ||
@@ -29,0 +30,0 @@ if (computable.hasNewSample(timestamp)) { |
@@ -7,3 +7,3 @@ "use strict"; | ||
canHandle(message) { | ||
const channel = message.params && message.params.channel; | ||
const channel = message.params !== undefined ? message.params.channel : undefined; | ||
if (channel === undefined) { | ||
@@ -10,0 +10,0 @@ return false; |
@@ -16,3 +16,3 @@ /// <reference types="node" /> | ||
}[]>; | ||
replay<T extends Exchange, U extends boolean = false, Z extends boolean = false>({ exchange, from, to, filters, skipDecoding, returnDisconnectsAsUndefined }: ReplayOptions<T, U, Z>): AsyncIterableIterator<Z extends true ? U extends true ? { | ||
replay<T extends Exchange, U extends boolean = false, Z extends boolean = false>({ exchange, from, to, filters, skipDecoding, withDisconnects }: ReplayOptions<T, U, Z>): AsyncIterableIterator<Z extends true ? U extends true ? { | ||
localTimestamp: Buffer; | ||
@@ -31,3 +31,3 @@ message: Buffer; | ||
replayNormalized<T extends Exchange, U extends MapperFactory<T, any>[], Z extends boolean = false>({ exchange, symbols, from, to, withDisconnectMessages }: ReplayNormalizedOptions<T, Z>, ...normalizers: U): AsyncIterableIterator<Z extends true ? (U extends MapperFactory<infer _, infer X>[] ? X | Disconnect : never) : (U extends MapperFactory<infer _, infer X>[] ? X : never)>; | ||
stream<T extends Exchange, U extends boolean = false>({ exchange, filters, timeoutIntervalMS, returnDisconnectsAsUndefined }: StreamOptions<T, U>): AsyncIterableIterator<U extends true ? { | ||
stream<T extends Exchange, U extends boolean = false>({ exchange, filters, timeoutIntervalMS, withDisconnects }: StreamOptions<T, U>): AsyncIterableIterator<U extends true ? { | ||
localTimestamp: Date; | ||
@@ -41,2 +41,3 @@ message: any; | ||
private _normalize; | ||
private _getFilters; | ||
private _validateReplayOptions; | ||
@@ -57,3 +58,3 @@ private _validateStreamOptions; | ||
skipDecoding?: U; | ||
returnDisconnectsAsUndefined?: Z; | ||
withDisconnects?: Z; | ||
}; | ||
@@ -71,3 +72,3 @@ export declare type ReplayNormalizedOptions<T extends Exchange, U extends boolean = false> = { | ||
timeoutIntervalMS?: number; | ||
returnDisconnectsAsUndefined?: U; | ||
withDisconnects?: U; | ||
}; | ||
@@ -74,0 +75,0 @@ export declare type StreamNormalizedOptions<T extends Exchange, U extends boolean = false> = { |
@@ -52,3 +52,3 @@ "use strict"; | ||
} | ||
async *replay({ exchange, from, to, filters, skipDecoding = undefined, returnDisconnectsAsUndefined = undefined }) { | ||
async *replay({ exchange, from, to, filters, skipDecoding = undefined, withDisconnects = undefined }) { | ||
this._validateReplayOptions(exchange, from, to, filters); | ||
@@ -135,6 +135,6 @@ const fromDate = handy_1.parseAsUTCDate(from); | ||
} | ||
// ignore empty lines unless returnDisconnectsAsUndefined is set to true | ||
// ignore empty lines unless withDisconnects is set to true | ||
// do not yield subsequent undefined messages | ||
} | ||
else if (returnDisconnectsAsUndefined === true && lastMessageWasUndefined === false) { | ||
else if (withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true; | ||
@@ -146,3 +146,3 @@ yield undefined; | ||
// do not yield subsequent undefined messages eg: two empty slices produce single undefined/disconnect message | ||
if (linesCount === 0 && returnDisconnectsAsUndefined === true && lastMessageWasUndefined === false) { | ||
if (linesCount === 0 && withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true; | ||
@@ -167,3 +167,3 @@ yield undefined; | ||
const nonFilterableExchanges = ['bitfinex', 'bitfinex-derivatives']; | ||
const filters = nonFilterableExchanges.includes(exchange) ? [] : createMappers().flatMap(mapper => mapper.getFilters(symbols)); | ||
const filters = nonFilterableExchanges.includes(exchange) ? [] : this._getFilters(createMappers(), symbols); | ||
const messages = this.replay({ | ||
@@ -173,3 +173,3 @@ exchange, | ||
to, | ||
returnDisconnectsAsUndefined: true, | ||
withDisconnects: true, | ||
filters | ||
@@ -179,3 +179,3 @@ }); | ||
} | ||
async *stream({ exchange, filters, timeoutIntervalMS = 10000, returnDisconnectsAsUndefined = undefined }) { | ||
async *stream({ exchange, filters, timeoutIntervalMS = 10000, withDisconnects = undefined }) { | ||
this._validateStreamOptions(filters); | ||
@@ -194,3 +194,3 @@ const realTimeFeed = realtimefeeds_1.createRealTimeFeed(exchange); | ||
} | ||
else if (returnDisconnectsAsUndefined) { | ||
else if (withDisconnects) { | ||
yield undefined; | ||
@@ -207,6 +207,6 @@ } | ||
const createMappers = () => normalizers.map(m => m(exchange)); | ||
const filters = createMappers().flatMap(mapper => mapper.getFilters(symbols)); | ||
const filters = this._getFilters(createMappers(), symbols); | ||
const messages = this.stream({ | ||
exchange, | ||
returnDisconnectsAsUndefined: true, | ||
withDisconnects: true, | ||
timeoutIntervalMS, | ||
@@ -255,2 +255,25 @@ filters | ||
} | ||
_getFilters(mappers, symbols) { | ||
const filters = mappers.flatMap(mapper => mapper.getFilters(symbols)); | ||
const deduplicatedFilters = filters.reduce((prev, current) => { | ||
const matchingExisting = prev.find(c => c.channel === current.channel); | ||
if (matchingExisting !== undefined) { | ||
if (matchingExisting.symbols !== undefined && current.symbols) { | ||
for (let symbol of current.symbols) { | ||
if (matchingExisting.symbols.includes(symbol) === false) { | ||
matchingExisting.symbols.push(symbol); | ||
} | ||
} | ||
} | ||
else if (current.symbols) { | ||
matchingExisting.symbols = [...current.symbols]; | ||
} | ||
} | ||
else { | ||
prev.push(current); | ||
} | ||
return prev; | ||
}, []); | ||
return deduplicatedFilters; | ||
} | ||
_validateReplayOptions(exchange, from, to, filters) { | ||
@@ -257,0 +280,0 @@ if (!exchange || consts_1.EXCHANGES.includes(exchange) === false) { |
@@ -1,27 +0,32 @@ | ||
const { tardis, combine } = require('tardis-node') | ||
const { tardis, normalizeTrades, compute, computeTradeBars } = require('tardis-node') | ||
async function replayCombined() { | ||
const bitmexMessages = tardis.replayNormalized({ | ||
exchange: 'bitmex', | ||
dataTypes: ['trade', 'book_change'], | ||
symbols: ['XBTUSD'], | ||
from: '2019-05-01', | ||
to: '2019-05-01 03:00' | ||
}) | ||
async function produceVolumeBasedTradeBars(messages) { | ||
const withVolumeTradeBars = compute( | ||
messages, | ||
computeTradeBars({ | ||
kind: 'volume', | ||
interval: 100 * 1000 // aggregate by 100k contracts volume | ||
}) | ||
) | ||
const deribitMessages = tardis.replayNormalized({ | ||
exchange: 'deribit', | ||
dataTypes: ['trade', 'book_change'], | ||
symbols: ['BTC-PERPETUAL'], | ||
from: '2019-05-01', | ||
to: '2019-05-01 03:00' | ||
}) | ||
const combinedStream = combine(bitmexMessages, deribitMessages) | ||
for await (const message of combinedStream) { | ||
console.log(message) | ||
for await (const message of withVolumeTradeBars) { | ||
if (message.type === 'trade_bar') { | ||
console.log(message.name, message) | ||
} | ||
} | ||
} | ||
await replayCombined() | ||
const historicalMessages = tardis.replayNormalized( | ||
{ exchange: 'bitmex', symbols: ['XBTUSD'], from: '2019-08-01', to: '2019-08-02' }, | ||
normalizeTrades | ||
) | ||
const realTimeMessages = tardis.streamNormalized( | ||
{ exchange: 'bitmex', symbols: ['XBTUSD'] }, | ||
normalizeTrades | ||
) | ||
await produceVolumeBasedTradeBars(historicalMessages) | ||
// or for real time data | ||
// await produceVolumeBasedTradeBars(realTimeMessages) |
{ | ||
"name": "tardis-node", | ||
"version": "5.0.0", | ||
"version": "6.0.0", | ||
"engines": { | ||
"node": ">=12" | ||
}, | ||
"description": "tardis-node library provides fast and convenient access to tick-level real-time and historical cryptocurrency market data.", | ||
"description": "Fast and convenient access to tick-level historical and real-time cryptocurrency market data via Node.js", | ||
"main": "dist/index.js", | ||
@@ -12,3 +12,3 @@ "source": "src/index.ts", | ||
"repository": "tardis-dev/tardis-node", | ||
"homepage": "https://docs.tardis.dev/api/tardis-node", | ||
"homepage": "https://docs.tardis.dev/api/node-js", | ||
"scripts": { | ||
@@ -37,2 +37,5 @@ "build": "tsc", | ||
"normalized cryptocurrency market data API", | ||
"order book reconstruction", | ||
"market data normalization", | ||
"cryptocurrency api", | ||
"cryptocurrency", | ||
@@ -83,3 +86,6 @@ "exchange", | ||
}, | ||
"runkitExampleFilename": "example.js" | ||
"runkitExampleFilename": "example.js", | ||
"optionalDependencies": { | ||
"bufferutil": "^4.0.1" | ||
} | ||
} |
319
README.md
@@ -6,17 +6,46 @@ # tardis-node | ||
Tardis Node library provides fast and convenient access to tick-level real-time and historical cryptocurrency market data. | ||
`Tardis-node` library provides convenient access to tick-level historical and real-time cryptocurrency market data both in exchange native and normalized formats. Instead of using callbacks it uses [`async iterables`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) that can be iterated via [`for await ...of`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) loop and that enables composability features like [seamless switching between real-time data streaming and historical data replay](https://docs.tardis.dev/api/node-js#seamless-switching-between-real-time-streaming-and-historical-market-data-replay) or [computing derived data locally](https://docs.tardis.dev/api/node-js#computing-derived-data-locally). | ||
Built-in support for: | ||
<br/> | ||
- real-time streaming market data with unified interface for connecting to public exchanges WebSocket APIs | ||
- historical market data replay backed by [tardis.dev](https://tardis.dev) API | ||
- both exchange native and normalized\* market data format | ||
- top cryptocurrency exchanges | ||
- automatic reconnection and stale connections detection logic for real-time streams | ||
- combining multiple exchanges feeds into single one | ||
- computing custom trade bins/bars and book snapshots client-side (eg: volume based bars, top 20 levels 100ms order book snapshots etc.) | ||
- full limit order book reconstruction, both for real-time and historical data | ||
```javascript | ||
const { tardis, normalizeTrades, normalizeBookChanges } = require('tardis-node') | ||
const messages = tardis.streamNormalized( | ||
{ | ||
exchange: 'bitmex', | ||
symbols: ['XBTUSD', 'ETHUSD'] | ||
}, | ||
normalizeTrades, | ||
normalizeBookChanges | ||
) | ||
for await (const message of messages) { | ||
console.log(message) | ||
} | ||
``` | ||
[![Try this code live on RunKit](https://img.shields.io/badge/-Try%20this%20code%20live%20on%20RunKit-c?color=5558be)](https://runkit.com/thad/tardis-node-stream-real-time-market-data) | ||
<br/> | ||
<br/> | ||
## Features | ||
- [real-time streaming](https://docs.tardis.dev/api/node-js#tardis-streamnormalized-options-normalizers) of tick-level market data with unified API for connecting directly to exchanges public WebSocket APIs without any intermediary/3rd party proxy | ||
- historical tick-level [market data replay](https://docs.tardis.dev/api/node-js#tardis-replaynormalized-options-normalizers) backed by [tardis.dev HTTP API](https://docs.tardis.dev/api/http#data-feeds-exchange) | ||
- support for both exchange native and [normalized market data](https://docs.tardis.dev/api/node-js#data-normalization) formats \(consistent format for accessing market data across multiple exchanges — normalized trades, order book and ticker data\) | ||
- [seamless switching between real-time streaming and historical market data replay](https://docs.tardis.dev/api/node-js#seamless-switching-between-real-time-streaming-and-historical-market-data-replay) thanks to [`async iterables`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) providing unified way of consuming data messages | ||
- transparent historical local data caching \(cached data is stored on disk in compressed GZIP format and decompressed on demand when reading the data\) | ||
- support for top cryptocurrency exchanges: BitMEX, Binance, Binance Futures, Deribit, Bitfinex, bitFlyer, Bitstamp, Coinbase Pro, Crypto Facilities, Gemini, FTX, Kraken and OKEx. | ||
- automatic closed connections and stale connections reconnection logic for real-time streams | ||
- [combining multiple exchanges feeds into single one](https://docs.tardis.dev/api/node-js#combining-data-streams) via [`combine`](https://docs.tardis.dev/api/node-js#combine-iterators) helper function — synchronized historical market data replay and consolidated real-time data streaming from multiple exchanges | ||
- [computing derived data locally](https://docs.tardis.dev/api/node-js#computing-derived-data-locally) like trade bars and book snapshots via [`compute`](https://docs.tardis.dev/api/node-js#compute-iterator-computables) helper function and `computables`, e.g., volume based bars, top 20 levels order book snapshots taken every 10 ms etc. | ||
- [full limit order book reconstruction](https://docs.tardis.dev/api/node-js#limit-order-book-reconstruction) both for real-time and historical data via `OrderBook` object | ||
- fast and lightweight architecture — low memory footprint and no heavy in-memory buffering | ||
- [extensible mapping logic](https://docs.tardis.dev/api/node-js#modifying-built-in-and-adding-custom-normalizers) that allows adjusting normalized formats for specific needs | ||
- built-in TypeScript support | ||
\* normalized: consistent format for accessing market data across multiple exchanges -normalized trade, order book L2 and ticker data | ||
<br/> | ||
<br/> | ||
@@ -27,202 +56,166 @@ ## Installation | ||
```sh | ||
```bash | ||
npm install tardis-node --save | ||
``` | ||
## Documentation | ||
<br/> | ||
<br/> | ||
See the [tardis-node docs](https://docs.tardis.dev/api/tardis-node). | ||
## Debugging and logging | ||
## Usage | ||
`tardis-node` lib uses [debug](https://github.com/visionmedia/debug) package for verbose logging and debugging purposes that can be enabled via `DEBUG` environment variable set to `tardis-node*`. | ||
### Stream real-time market data in exchange native data format | ||
<br/> | ||
<br/> | ||
```js | ||
const { tardis } = require('tardis-node') | ||
## Documentation | ||
async function stream() { | ||
const messages = tardis.stream({ | ||
exchange: 'bitmex', | ||
filters: [{ channel: 'trade', symbols: ['XBTUSD'] }, { channel: 'orderBookL2', symbols: ['XBTUSD'] }] | ||
}) | ||
See the official [tardis-node docs](https://docs.tardis.dev/api/node-js). | ||
for await (const { message, localTimestamp } of messages) { | ||
console.log(message) | ||
} | ||
} | ||
<br/> | ||
<br/> | ||
stream() | ||
``` | ||
## Examples | ||
### Replay historical market data in exchange native data format | ||
### Real-time spread across multiple exchanges | ||
```js | ||
const { tardis } = require('tardis-node') | ||
Example showing how to quickly display real-time spread and best bid/ask info across multiple exchanges at once. It can be easily adapted to do the same for historical data \(`replayNormalized` instead of `streamNormalized`). | ||
async function replay() { | ||
const messages = tardis.replay({ | ||
exchange: 'bitmex', | ||
filters: [{ channel: 'trade', symbols: ['XBTUSD'] }, { channel: 'orderBookL2', symbols: ['XBTUSD'] }], | ||
from: '2019-05-01', | ||
to: '2019-05-02' | ||
}) | ||
```javascript | ||
const { tardis, normalizeBookChanges, combine, compute, | ||
computeBookSnapshots } = require('tardis-node') | ||
for await (const { message, localTimestamp } of messages) { | ||
console.log(message) | ||
} | ||
} | ||
const exchangesToStream = [ | ||
{ exchange: 'bitmex', symbols: ['XBTUSD'] }, | ||
{ exchange: 'deribit', symbols: ['BTC-PERPETUAL'] }, | ||
{ exchange: 'cryptofacilities', symbols: ['PI_XBTUSD'] } | ||
] | ||
// for each specified exchange call streamNormalized for it | ||
// so we have multiple real-time streams for all specified exchanges | ||
const realTimeStreams = exchangesToStream.map(e => { | ||
return tardis.streamNormalized(e, normalizeBookChanges) | ||
}) | ||
replay() | ||
``` | ||
// combine all real-time message streams into one | ||
const messages = combine(...realTimeStreams) | ||
### Stream real-time market data in normalized data format | ||
// create book snapshots with depth1 that are produced | ||
// every time best bid/ask info is changed | ||
// effectively computing real-time quotes | ||
const realTimeQuoteComputable = computeBookSnapshots({ | ||
depth: 1, | ||
interval: 0, | ||
name: 'realtime_quote' | ||
}) | ||
```js | ||
const { tardis, normalizeTrades, normalizeBookChanges } = require('tardis-node') | ||
// compute real-time quotes for combines real-time messages | ||
const messagesWithQuotes = compute(messages, realTimeQuoteComputable) | ||
async function streamNormalized() { | ||
const messages = tardis.streamNormalized( | ||
{ | ||
exchange: 'bitmex', | ||
symbols: ['XBTUSD'] | ||
}, | ||
normalizeTrades, | ||
normalizeBookChanges | ||
) | ||
const spreads = {} | ||
for await (const message of messages) { | ||
console.log(message) | ||
// print spreads info every 100ms | ||
setInterval(() => { | ||
console.clear() | ||
console.log(spreads) | ||
}, 100) | ||
// update spreads info real-time | ||
for await (const message of messagesWithQuotes) { | ||
if (message.type === 'book_snapshot') { | ||
spreads[message.exchange] = { | ||
spread: message.asks[0].price - message.bids[0].price, | ||
bestBid: message.bids[0], | ||
bestAsk: message.asks[0] | ||
} | ||
} | ||
} | ||
streamNormalized() | ||
``` | ||
### Replay historical market data in normalized data format | ||
[![Try this code live on RunKit](https://img.shields.io/badge/-Try%20this%20code%20live%20on%20RunKit-c?color=5558be)](https://runkit.com/thad/tardis-node-real-time-spread-for-multiple-exchanges) | ||
```js | ||
const { tardis, normalizeTrades, normalizeBookChanges } = require('tardis-node') | ||
<br/> | ||
async function replayNormalized() { | ||
const messages = tardis.replayNormalized( | ||
{ | ||
exchange: 'bitmex', | ||
symbols: ['XBTUSD'], | ||
from: '2019-05-01', | ||
to: '2019-05-02' | ||
}, | ||
normalizeTrades, | ||
normalizeBookChanges | ||
### Seamless switching between real-time streaming and historical market data replay | ||
Example showing simple pattern of providing `async iterable` of market data messages to the function that process them no matter if it's is real-time or historical market data. This allows having the same logic for example for both back-testing and live trading. | ||
```javascript | ||
const { tardis, normalizeTrades, compute, computeTradeBars } = require('tardis-node') | ||
async function produceVolumeBasedTradeBars(messages) { | ||
const withVolumeTradeBars = compute( | ||
messages, | ||
computeTradeBars({ | ||
kind: 'volume', | ||
interval: 100 * 1000 // aggregate by 100k contracts volume | ||
}) | ||
) | ||
for await (const message of messages) { | ||
console.log(message) | ||
for await (const message of withVolumeTradeBars) { | ||
if (message.type === 'trade_bar') { | ||
console.log(message.name, message) | ||
} | ||
} | ||
} | ||
replayNormalized() | ||
``` | ||
const historicalMessages = tardis.replayNormalized( | ||
{ exchange: 'bitmex', symbols: ['XBTUSD'], from: '2019-08-01', to: '2019-08-02' }, | ||
normalizeTrades | ||
) | ||
### Combine two historical exchange market data feeds | ||
const realTimeMessages = tardis.streamNormalized({ exchange: 'bitmex', symbols: ['XBTUSD'] }, normalizeTrades) | ||
Returns single messages 'stream' that is ordered by `localTimestamp`. | ||
It works the same way for real-time market data as well, but messages are returned in FIFO manner. | ||
await produceVolumeBasedTradeBars(historicalMessages) | ||
```js | ||
const { tardis, normalizeTrades, normalizeBookChanges, combine } = require('tardis-node') | ||
// or for real time data | ||
// await produceVolumeBasedTradeBars(realTimeMessages) | ||
``` | ||
async function replayCombined() { | ||
const bitmexMessages = tardis.replayNormalized( | ||
{ | ||
exchange: 'bitmex', | ||
symbols: ['XBTUSD'], | ||
from: '2019-05-01', | ||
to: '2019-05-02' | ||
}, | ||
normalizeTrades, | ||
normalizeBookChanges | ||
) | ||
[![Try this code live on RunKit](https://img.shields.io/badge/-Try%20this%20code%20live%20on%20RunKit-c?color=5558be)](https://runkit.com/thad/tardis-node-seamless-switching-between-real-time-streaming-and-historical-market-data-replay) | ||
const deribitMessages = tardis.replayNormalized( | ||
{ | ||
exchange: 'deribit', | ||
symbols: ['BTC-PERPETUAL'], | ||
from: '2019-05-01', | ||
to: '2019-05-02' | ||
}, | ||
normalizeTrades, | ||
normalizeBookChanges | ||
) | ||
<br/> | ||
const combinedStream = combine(bitmexMessages, deribitMessages) | ||
### Stream real-time market data in exchange native data format | ||
for await (const message of combinedStream) { | ||
console.log(message) | ||
} | ||
```javascript | ||
const messages = tardis.stream({ | ||
exchange: 'bitmex', | ||
filters: [ | ||
{ channel: 'trade', symbols: ['XBTUSD'] }, | ||
{ channel: 'orderBookL2', symbols: ['XBTUSD'] } | ||
] | ||
}) | ||
for await (const message of messages) { | ||
console.log(message) | ||
} | ||
replayCombined() | ||
``` | ||
### Compute 10 seconds trade bins and top 5 levels book snapshots every 2 seconds for real-time market data stream | ||
[![Try this code live on RunKit](https://img.shields.io/badge/-Try%20this%20code%20live%20on%20RunKit-b?color=5558be)](https://runkit.com/thad/tardis-node-stream-market-data) | ||
```js | ||
const { tardis, normalizeTrades, normalizeBookChanges, compute, computeTradeBars, computeBookSnapshots } = require('tardis-node') | ||
<br/> | ||
async function streamComputed() { | ||
const bitmexMessages = tardis.streamNormalized( | ||
{ | ||
exchange: 'bitmex', | ||
symbols: ['XBTUSD'] | ||
}, | ||
normalizeTrades, | ||
normalizeBookChanges | ||
) | ||
### Replay historical market data in exchange native data format | ||
const messagesWithComputedTypes = compute( | ||
bitmexMessages, | ||
computeTradeBars({ kind: 'time', interval: 10 * 1000 }), | ||
computeBookSnapshots({ depth: 5, interval: 2 * 1000 }) | ||
) | ||
```javascript | ||
const messages = tardis.replay({ | ||
exchange: 'bitmex', | ||
filters: [ | ||
{ channel: 'trade', symbols: ['XBTUSD'] }, | ||
{ channel: 'orderBookL2', symbols: ['XBTUSD'] } | ||
], | ||
from: '2019-05-01', | ||
to: '2019-05-02' | ||
}) | ||
for await (const message of messagesWithComputedTypes) { | ||
if (message.type === 'book_snapshot' || message.type === 'trade_bar') { | ||
console.log(message) | ||
} | ||
} | ||
for await (const message of messages) { | ||
console.log(message) | ||
} | ||
streamComputed() | ||
``` | ||
### Reconstruct historical limit order book at any point in time | ||
[![Try this code live on RunKit](https://img.shields.io/badge/-Try%20this%20code%20live%20on%20RunKit-b?color=5558be)](https://runkit.com/thad/tardis-node-replay-market-data) | ||
It works in the same way for real-time market data. | ||
<br/> | ||
<br/> | ||
```js | ||
const { tardis, normalizeTrades, normalizeBookChanges, OrderBook } = require('tardis-node') | ||
async function reconstructLOB() { | ||
const bitmexXBTMessages = tardis.replayNormalized( | ||
{ | ||
exchange: 'bitmex', | ||
symbols: ['XBTUSD'], | ||
from: '2019-05-01', | ||
to: '2019-05-02' | ||
}, | ||
normalizeTrades, | ||
normalizeBookChanges | ||
) | ||
const orderBook = new OrderBook() | ||
for await (const message of bitmexXBTMessages) { | ||
if (message.type === 'book_change') { | ||
orderBook.update(message) | ||
} | ||
console.log(message.localTimestamp.toISOString(), orderBook.bestAsk(), orderBook.bestBid()) | ||
// or orderBook.bids(), orderBook.asks() to get all levels at any given point in time | ||
} | ||
} | ||
reconstructLOB() | ||
``` | ||
## See the [tardis-node docs](https://docs.tardis.dev/api/node-js) for more examples. |
@@ -39,12 +39,12 @@ import { Exchange, NormalizedData, Disconnect } from '../types' | ||
for (const computable of computables) { | ||
// any time new message arrives check if given computable has | ||
// new sample for such message timestamp, eg: time based trade bars | ||
// any time new message arrives check if given computable | ||
// source data types include message type and | ||
// has new sample for such message timestamp, eg: time based trade bars | ||
if (computable.sourceDataTypes.includes(message.type)) { | ||
if (computable.hasNewSample(timestamp)) { | ||
yield computable.getSample(localTimestamp) | ||
} | ||
if (computable.hasNewSample(timestamp)) { | ||
yield computable.getSample(localTimestamp) | ||
} | ||
// update computable with new data if data types match | ||
// and check if such computable after update has new sample as well | ||
if (computable.sourceDataTypes.includes(message.type)) { | ||
// update computable with new data | ||
// and check if such computable after update has new sample as well | ||
computable.update(message) | ||
@@ -51,0 +51,0 @@ if (computable.hasNewSample(timestamp)) { |
@@ -8,3 +8,3 @@ import { DerivativeTicker, Trade, BookChange } from '../types' | ||
canHandle(message: any) { | ||
const channel = message.params && (message.params.channel as string | undefined) | ||
const channel = message.params !== undefined ? (message.params.channel as string | undefined) : undefined | ||
if (channel === undefined) { | ||
@@ -11,0 +11,0 @@ return false |
@@ -74,3 +74,3 @@ import os from 'os' | ||
skipDecoding = undefined, | ||
returnDisconnectsAsUndefined = undefined | ||
withDisconnects = undefined | ||
}: ReplayOptions<T, U, Z>): AsyncIterableIterator< | ||
@@ -186,5 +186,5 @@ Z extends true | ||
} | ||
// ignore empty lines unless returnDisconnectsAsUndefined is set to true | ||
// ignore empty lines unless withDisconnects is set to true | ||
// do not yield subsequent undefined messages | ||
} else if (returnDisconnectsAsUndefined === true && lastMessageWasUndefined === false) { | ||
} else if (withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true | ||
@@ -196,3 +196,3 @@ yield undefined as any | ||
// do not yield subsequent undefined messages eg: two empty slices produce single undefined/disconnect message | ||
if (linesCount === 0 && returnDisconnectsAsUndefined === true && lastMessageWasUndefined === false) { | ||
if (linesCount === 0 && withDisconnects === true && lastMessageWasUndefined === false) { | ||
lastMessageWasUndefined = true | ||
@@ -242,3 +242,3 @@ yield undefined as any | ||
const nonFilterableExchanges = ['bitfinex', 'bitfinex-derivatives'] | ||
const filters = nonFilterableExchanges.includes(exchange) ? [] : createMappers().flatMap(mapper => mapper.getFilters(symbols)) | ||
const filters = nonFilterableExchanges.includes(exchange) ? [] : this._getFilters(createMappers(), symbols) | ||
@@ -249,3 +249,3 @@ const messages = this.replay({ | ||
to, | ||
returnDisconnectsAsUndefined: true, | ||
withDisconnects: true, | ||
filters | ||
@@ -261,3 +261,3 @@ }) | ||
timeoutIntervalMS = 10000, | ||
returnDisconnectsAsUndefined = undefined | ||
withDisconnects = undefined | ||
}: StreamOptions<T, U>): AsyncIterableIterator< | ||
@@ -282,3 +282,3 @@ U extends true ? { localTimestamp: Date; message: any } | undefined : { localTimestamp: Date; message: any } | ||
} as any | ||
} else if (returnDisconnectsAsUndefined) { | ||
} else if (withDisconnects) { | ||
yield undefined as any | ||
@@ -304,7 +304,7 @@ } | ||
const createMappers = () => normalizers.map(m => m(exchange)) | ||
const filters = createMappers().flatMap(mapper => mapper.getFilters(symbols)) | ||
const filters = this._getFilters(createMappers(), symbols) | ||
const messages = this.stream({ | ||
exchange, | ||
returnDisconnectsAsUndefined: true, | ||
withDisconnects: true, | ||
timeoutIntervalMS, | ||
@@ -369,2 +369,30 @@ filters | ||
private _getFilters<T extends Exchange>(mappers: Mapper<T, any>[], symbols?: string[]) { | ||
const filters = mappers.flatMap(mapper => mapper.getFilters(symbols)) | ||
const deduplicatedFilters = filters.reduce( | ||
(prev, current) => { | ||
const matchingExisting = prev.find(c => c.channel === current.channel) | ||
if (matchingExisting !== undefined) { | ||
if (matchingExisting.symbols !== undefined && current.symbols) { | ||
for (let symbol of current.symbols) { | ||
if (matchingExisting.symbols.includes(symbol) === false) { | ||
matchingExisting.symbols.push(symbol) | ||
} | ||
} | ||
} else if (current.symbols) { | ||
matchingExisting.symbols = [...current.symbols] | ||
} | ||
} else { | ||
prev.push(current) | ||
} | ||
return prev | ||
}, | ||
[] as FilterForExchange[T][] | ||
) | ||
return deduplicatedFilters | ||
} | ||
private _validateReplayOptions<T extends Exchange>(exchange: T, from: string, to: string, filters: FilterForExchange[T][]) { | ||
@@ -435,3 +463,3 @@ if (!exchange || EXCHANGES.includes(exchange) === false) { | ||
skipDecoding?: U | ||
returnDisconnectsAsUndefined?: Z | ||
withDisconnects?: Z | ||
} | ||
@@ -451,3 +479,3 @@ | ||
timeoutIntervalMS?: number | ||
returnDisconnectsAsUndefined?: U | ||
withDisconnects?: U | ||
} | ||
@@ -454,0 +482,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
510848
8835
7
220