tardis-dev
Advanced tools
Comparing version 12.4.11 to 12.4.12
@@ -52,5 +52,6 @@ "use strict"; | ||
} | ||
if (!combinedStream.write(message)) | ||
if (!combinedStream.write(message)) { | ||
// Handle backpressure on write | ||
await events_1.once(combinedStream, 'drain'); | ||
} | ||
} | ||
@@ -57,0 +58,0 @@ }); |
@@ -77,3 +77,3 @@ "use strict"; | ||
this.symbolToDepthInfoMapping[symbol] = { | ||
bufferedUpdates: new handy_1.CircularBuffer(200) | ||
bufferedUpdates: new handy_1.CircularBuffer(2000) | ||
}; | ||
@@ -80,0 +80,0 @@ } |
@@ -123,3 +123,3 @@ "use strict"; | ||
this.symbolToMBPInfoMapping[symbol] = { | ||
bufferedUpdates: new handy_1.CircularBuffer(200) | ||
bufferedUpdates: new handy_1.CircularBuffer(20) | ||
}; | ||
@@ -130,5 +130,2 @@ } | ||
if (isSnapshot(message)) { | ||
if (snapshotAlreadyProcessed) { | ||
return; | ||
} | ||
const snapshotBids = message.data.bids.map(this._mapBookLevel); | ||
@@ -165,3 +162,2 @@ const snapshotAsks = message.data.asks.map(this._mapBookLevel); | ||
mbpInfo.snapshotProcessed = true; | ||
mbpInfo.bufferedUpdates.clear(); | ||
yield { | ||
@@ -178,12 +174,11 @@ type: 'book_change', | ||
} | ||
else if (snapshotAlreadyProcessed) { | ||
// snapshot was already processed let's map the mbp message as normal book_change | ||
const update = this._mapMBPUpdate(message, symbol, localTimestamp); | ||
if (update !== undefined) { | ||
yield update; | ||
} | ||
} | ||
else { | ||
// there was no snapshot yet, let's buffer the update | ||
mbpInfo.bufferedUpdates.append(message); | ||
if (snapshotAlreadyProcessed) { | ||
// snapshot was already processed let's map the mbp message as normal book_change | ||
const update = this._mapMBPUpdate(message, symbol, localTimestamp); | ||
if (update !== undefined) { | ||
yield update; | ||
} | ||
} | ||
} | ||
@@ -190,0 +185,0 @@ } |
{ | ||
"name": "tardis-dev", | ||
"version": "12.4.11", | ||
"version": "12.4.12", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=12" |
@@ -71,5 +71,6 @@ import { PassThrough } from 'stream' | ||
if (!combinedStream.write(message)) | ||
if (!combinedStream.write(message)) { | ||
// Handle backpressure on write | ||
await once(combinedStream, 'drain') | ||
} | ||
} | ||
@@ -76,0 +77,0 @@ }) |
@@ -91,3 +91,3 @@ import { debug } from '../debug' | ||
this.symbolToDepthInfoMapping[symbol] = { | ||
bufferedUpdates: new CircularBuffer<BinanceDepthData>(200) | ||
bufferedUpdates: new CircularBuffer<BinanceDepthData>(2000) | ||
} | ||
@@ -94,0 +94,0 @@ } |
@@ -140,3 +140,3 @@ import { BookChange, DerivativeTicker, Exchange, FilterForExchange, Liquidation, Trade } from '../types' | ||
this.symbolToMBPInfoMapping[symbol] = { | ||
bufferedUpdates: new CircularBuffer<HuobiMBPDataMessage>(200) | ||
bufferedUpdates: new CircularBuffer<HuobiMBPDataMessage>(20) | ||
} | ||
@@ -149,5 +149,2 @@ } | ||
if (isSnapshot(message)) { | ||
if (snapshotAlreadyProcessed) { | ||
return | ||
} | ||
const snapshotBids = message.data.bids.map(this._mapBookLevel) | ||
@@ -186,3 +183,2 @@ const snapshotAsks = message.data.asks.map(this._mapBookLevel) | ||
mbpInfo.snapshotProcessed = true | ||
mbpInfo.bufferedUpdates.clear() | ||
@@ -199,11 +195,12 @@ yield { | ||
} as const | ||
} else if (snapshotAlreadyProcessed) { | ||
// snapshot was already processed let's map the mbp message as normal book_change | ||
const update = this._mapMBPUpdate(message, symbol, localTimestamp) | ||
if (update !== undefined) { | ||
yield update | ||
} | ||
} else { | ||
// there was no snapshot yet, let's buffer the update | ||
mbpInfo.bufferedUpdates.append(message) | ||
if (snapshotAlreadyProcessed) { | ||
// snapshot was already processed let's map the mbp message as normal book_change | ||
const update = this._mapMBPUpdate(message, symbol, localTimestamp) | ||
if (update !== undefined) { | ||
yield update | ||
} | ||
} | ||
} | ||
@@ -210,0 +207,0 @@ } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
358
1048856
18820