serum-vial
Advanced tools
Comparing version 0.8.4 to 0.8.5
@@ -171,3 +171,3 @@ "use strict"; | ||
}; | ||
await this._send(ws, JSON.stringify(errorMessage)); | ||
await this._send(ws, () => JSON.stringify(errorMessage)); | ||
return; | ||
@@ -187,3 +187,3 @@ } | ||
}; | ||
await this._send(ws, JSON.stringify(errorMessage)); | ||
await this._send(ws, () => JSON.stringify(errorMessage)); | ||
return; | ||
@@ -198,3 +198,3 @@ } | ||
}; | ||
await this._send(ws, JSON.stringify(confirmationMessage)); | ||
await this._send(ws, () => JSON.stringify(confirmationMessage)); | ||
// 'unpack' channel to specific message types that will be published for it | ||
@@ -209,3 +209,3 @@ const requestedTypes = consts_1.MESSAGE_TYPES_PER_CHANNEL[request.channel]; | ||
if (recentTrades !== undefined) { | ||
await this._send(ws, recentTrades); | ||
await this._send(ws, () => this._recentTradesSerialized[market]); | ||
} | ||
@@ -219,22 +219,13 @@ else { | ||
}; | ||
await this._send(ws, JSON.stringify(emptyRecentTradesMessage)); | ||
await this._send(ws, () => JSON.stringify(emptyRecentTradesMessage)); | ||
} | ||
} | ||
if (type === 'quote') { | ||
const quote = this._quotesSerialized[market]; | ||
if (quote !== undefined) { | ||
await this._send(ws, quote); | ||
} | ||
await this._send(ws, () => this._quotesSerialized[market]); | ||
} | ||
if (type == 'l2snapshot') { | ||
const l2Snapshot = this._l2SnapshotsSerialized[market]; | ||
if (l2Snapshot !== undefined) { | ||
await this._send(ws, l2Snapshot); | ||
} | ||
await this._send(ws, () => this._l2SnapshotsSerialized[market]); | ||
} | ||
if (type === 'l3snapshot') { | ||
const l3Snapshot = this._l3SnapshotsSerialized[market]; | ||
if (l3Snapshot !== undefined) { | ||
await this._send(ws, l3Snapshot); | ||
} | ||
await this._send(ws, () => this._l3SnapshotsSerialized[market]); | ||
} | ||
@@ -255,3 +246,4 @@ ws.subscribe(topic); | ||
const message = 'Subscription request internal error'; | ||
logger_1.logger.log('info', `${message} , ${err.message} ${err.stack}`, meta); | ||
const errorMessage = typeof err === 'string' ? err : `${err.message}, ${err.stack}`; | ||
logger_1.logger.log('info', `${message}, ${errorMessage}`, meta); | ||
try { | ||
@@ -263,12 +255,15 @@ ws.end(1011, message); | ||
} | ||
async _send(ws, message) { | ||
let elapsed = 0; | ||
async _send(ws, getMessage) { | ||
let retries = 0; | ||
while (ws.getBufferedAmount() > 0) { | ||
await helpers_1.wait(5); | ||
elapsed += 5; | ||
if (elapsed > 500) { | ||
await helpers_1.wait(2); | ||
retries += 1; | ||
if (retries > 300) { | ||
ws.end(1008, 'Too much backpressure'); | ||
} | ||
} | ||
ws.send(message); | ||
const message = getMessage(); | ||
if (message !== undefined) { | ||
ws.send(message); | ||
} | ||
} | ||
@@ -275,0 +270,0 @@ _validateRequestPayload(message) { |
{ | ||
"name": "serum-vial", | ||
"version": "0.8.4", | ||
"version": "0.8.5", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=15" |
@@ -223,3 +223,3 @@ import { getLayoutVersion, Market } from '@project-serum/serum' | ||
await this._send(ws, JSON.stringify(errorMessage)) | ||
await this._send(ws, () => JSON.stringify(errorMessage)) | ||
@@ -244,3 +244,3 @@ return | ||
await this._send(ws, JSON.stringify(errorMessage)) | ||
await this._send(ws, () => JSON.stringify(errorMessage)) | ||
@@ -259,3 +259,3 @@ return | ||
await this._send(ws, JSON.stringify(confirmationMessage)) | ||
await this._send(ws, () => JSON.stringify(confirmationMessage)) | ||
@@ -272,3 +272,3 @@ // 'unpack' channel to specific message types that will be published for it | ||
if (recentTrades !== undefined) { | ||
await this._send(ws, recentTrades) | ||
await this._send(ws, () => this._recentTradesSerialized[market]) | ||
} else { | ||
@@ -282,3 +282,3 @@ const emptyRecentTradesMessage: RecentTrades = { | ||
await this._send(ws, JSON.stringify(emptyRecentTradesMessage)) | ||
await this._send(ws, () => JSON.stringify(emptyRecentTradesMessage)) | ||
} | ||
@@ -288,22 +288,11 @@ } | ||
if (type === 'quote') { | ||
const quote = this._quotesSerialized[market] | ||
if (quote !== undefined) { | ||
await this._send(ws, quote) | ||
} | ||
await this._send(ws, () => this._quotesSerialized[market]) | ||
} | ||
if (type == 'l2snapshot') { | ||
const l2Snapshot = this._l2SnapshotsSerialized[market] | ||
if (l2Snapshot !== undefined) { | ||
await this._send(ws, l2Snapshot) | ||
} | ||
await this._send(ws, () => this._l2SnapshotsSerialized[market]) | ||
} | ||
if (type === 'l3snapshot') { | ||
const l3Snapshot = this._l3SnapshotsSerialized[market] | ||
if (l3Snapshot !== undefined) { | ||
await this._send(ws, l3Snapshot) | ||
} | ||
await this._send(ws, () => this._l3SnapshotsSerialized[market]) | ||
} | ||
@@ -324,4 +313,5 @@ | ||
const message = 'Subscription request internal error' | ||
const errorMessage = typeof err === 'string' ? err : `${err.message}, ${err.stack}` | ||
logger.log('info', `${message} , ${err.message} ${err.stack}`, meta) | ||
logger.log('info', `${message}, ${errorMessage}`, meta) | ||
try { | ||
@@ -333,9 +323,9 @@ ws.end(1011, message) | ||
private async _send(ws: WebSocket, message: any) { | ||
let elapsed = 0 | ||
private async _send(ws: WebSocket, getMessage: () => string | undefined) { | ||
let retries = 0 | ||
while (ws.getBufferedAmount() > 0) { | ||
await wait(5) | ||
elapsed += 5 | ||
await wait(2) | ||
retries += 1 | ||
if (elapsed > 500) { | ||
if (retries > 300) { | ||
ws.end(1008, 'Too much backpressure') | ||
@@ -345,3 +335,6 @@ } | ||
ws.send(message) | ||
const message = getMessage() | ||
if (message !== undefined) { | ||
ws.send(message) | ||
} | ||
} | ||
@@ -348,0 +341,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
269200
3988