@waves.exchange/config-ws
Advanced tools
Comparing version 0.1.2 to 0.2.0
import { BehaviorSubject } from '@waves.exchange/reactive'; | ||
import { ConfigState, SocketErrorEvent } from './types'; | ||
import { ConfigState } from './types'; | ||
export declare class WsConfigService { | ||
configSubject: BehaviorSubject<ConfigState>; | ||
errorsSubject: BehaviorSubject<SocketErrorEvent>; | ||
private readonly _url; | ||
private _configWebSocket; | ||
private _connectConfigTimeout; | ||
private _isOpened; | ||
private _initialValueTimers; | ||
private _timeout; | ||
private _lastMessage; | ||
constructor(url: string, timeout?: { | ||
initialValue: number; | ||
}); | ||
configUpdates(filePath: string): BehaviorSubject<ConfigState>; | ||
constructor(url: string); | ||
private ensureConfigWebSocket; | ||
private cleanUpConfigWebSocket; | ||
private pingSubscribe; | ||
private sendAnalytics; | ||
private resubscribeOnTopics; | ||
private subscribeOnCheckingMessage; | ||
configUpdates(filePath: string): BehaviorSubject<ConfigState>; | ||
} | ||
//# sourceMappingURL=configUpdates.d.ts.map |
@@ -6,3 +6,5 @@ "use strict"; | ||
const emptyConfigState = { | ||
configs: Object.create(null) | ||
configs: Object.create(null), | ||
filePath: null, | ||
type: null | ||
}; | ||
@@ -12,10 +14,5 @@ const RECONNECT_TIMEOUT = 5000; | ||
class WsConfigService { | ||
constructor(url, timeout = { initialValue: 10000 }) { | ||
constructor(url) { | ||
this.configSubject = new reactive_1.BehaviorSubject(emptyConfigState); | ||
this.errorsSubject = new reactive_1.BehaviorSubject({}); | ||
this._isOpened = false; | ||
this._initialValueTimers = Object.create(null); | ||
this._lastMessage = 0; | ||
this._url = url; | ||
this._timeout = timeout; | ||
this.ensureConfigWebSocket(); | ||
@@ -26,30 +23,5 @@ window.addEventListener('online', () => { | ||
window.addEventListener('offline', () => { | ||
var _a; | ||
(_a = this._configWebSocket) === null || _a === void 0 ? void 0 : _a.unsubscribe(1000, 'Client is offline'); | ||
this._configWebSocket = undefined; | ||
this.cleanUpConfigWebSocket(); | ||
}); | ||
} | ||
configUpdates(filePath) { | ||
const topic = `topic://${filePath}`; | ||
const ws = (this._configWebSocket || | ||
this.ensureConfigWebSocket()); | ||
this._initialValueTimers[topic] = window.setTimeout(() => { | ||
this.errorsSubject.next({ | ||
code: 3, | ||
message: 'Initial value socket timeout', | ||
params: { | ||
topic | ||
} | ||
}); | ||
}, this._timeout.initialValue); | ||
ws.multiplex(() => ({ type: 'subscribe', topic }), () => ({ type: 'unsubscribe', topic }), (message) => message.topic === topic && (message.type === 'update' || message.type === 'subscribed')).subscribe(data => { | ||
clearTimeout(this._initialValueTimers[topic]); | ||
const oldConfigs = this.configSubject.getValue().configs; | ||
this.configSubject.next({ | ||
configs: Object.assign(Object.assign({}, oldConfigs), { [filePath]: data === null || data === void 0 ? void 0 : data.value }) | ||
}); | ||
}); | ||
return this.configSubject; | ||
} | ||
ensureConfigWebSocket() { | ||
@@ -60,7 +32,12 @@ if (!this._configWebSocket) { | ||
onOpen: () => { | ||
this._isOpened = true; | ||
if (!isEmpty(this.configSubject.getValue().configs)) { | ||
const oldConfigs = this.configSubject.getValue().configs; | ||
if (oldConfigs) { | ||
Object.keys(oldConfigs).forEach(data => { | ||
this.configUpdates(data); | ||
}); | ||
} | ||
} | ||
}, | ||
onClose: event => { | ||
this.sendAnalytics(event); | ||
this._isOpened = false; | ||
onClose: () => { | ||
this.cleanUpConfigWebSocket(); | ||
@@ -74,5 +51,3 @@ if (window.navigator.onLine) { | ||
}); | ||
this.pingSubscribe(); | ||
this.resubscribeOnTopics(); | ||
this.subscribeOnCheckingMessage(); | ||
this.pingSubscribe(this._configWebSocket); | ||
} | ||
@@ -82,6 +57,2 @@ return this._configWebSocket; | ||
cleanUpConfigWebSocket() { | ||
this._lastMessage = 0; | ||
Object.values(this._initialValueTimers).forEach(timer => { | ||
clearTimeout(timer); | ||
}); | ||
if (this._configWebSocket) { | ||
@@ -92,4 +63,3 @@ this._configWebSocket.unsubscribe(); | ||
} | ||
pingSubscribe() { | ||
const ws = this._configWebSocket; | ||
pingSubscribe(ws) { | ||
ws.subscribe((message) => { | ||
@@ -99,59 +69,20 @@ if (message.type === 'ping') { | ||
} | ||
}, (err) => { | ||
const errString = err instanceof CloseEvent ? (err.reason || err.code) : String(err); | ||
console.error('Config WS Ping Error: %s', errString); | ||
}); | ||
} | ||
sendAnalytics(event) { | ||
const statParams = { | ||
wasClean: event.wasClean, | ||
eventPhase: event.eventPhase, | ||
timeStamp: event.timeStamp, | ||
readyState: event.currentTarget.readyState, | ||
code: event.code, | ||
reason: event.reason | ||
}; | ||
if (!this._isOpened) { | ||
this.errorsSubject.next({ | ||
code: 1, | ||
message: 'Failed to connect', | ||
params: statParams | ||
configUpdates(filePath) { | ||
const topic = `topic://${filePath}`; | ||
const ws = (this._configWebSocket || | ||
this.ensureConfigWebSocket()); | ||
ws.multiplex(() => ({ type: 'subscribe', topic }), () => ({ type: 'unsubscribe', topic }), (message) => message.topic === topic && message.type === 'update').subscribe(data => { | ||
const oldConfigs = this.configSubject.getValue().configs; | ||
this.configSubject.next({ | ||
configs: Object.assign(Object.assign({}, oldConfigs), { [filePath]: data === null || data === void 0 ? void 0 : data.value }), | ||
filePath: data.topic, | ||
type: data.type | ||
}); | ||
} | ||
if (this._isOpened && !event.wasClean) { | ||
this.errorsSubject.next({ | ||
code: 2, | ||
message: 'Connection closed dirty', | ||
params: { | ||
error: statParams | ||
} | ||
}); | ||
} | ||
} | ||
resubscribeOnTopics() { | ||
if (!isEmpty(this.configSubject.getValue().configs)) { | ||
const oldConfigs = this.configSubject.getValue().configs; | ||
if (oldConfigs) { | ||
Object.keys(oldConfigs).forEach(data => { | ||
this.configUpdates(data); | ||
}); | ||
} | ||
} | ||
} | ||
subscribeOnCheckingMessage() { | ||
const ws = this._configWebSocket; | ||
ws.subscribe((message) => { | ||
var _a; | ||
if (this._lastMessage + 1 !== message.message_number) { | ||
this.errorsSubject.next({ | ||
code: 4, | ||
message: 'Wrong message number', | ||
params: { | ||
message_number: message.message_number, | ||
mustBe: this._lastMessage + 1 | ||
} | ||
}); | ||
(_a = this._configWebSocket) === null || _a === void 0 ? void 0 : _a.unsubscribe(1000, 'Wrong message number'); | ||
this._configWebSocket = undefined; | ||
return; | ||
} | ||
this._lastMessage = message.message_number; | ||
}); | ||
return this.configSubject; | ||
} | ||
@@ -158,0 +89,0 @@ } |
@@ -32,7 +32,7 @@ export declare type PingUpdate = { | ||
export declare type PongMessage = { | ||
type: 'pong' | 'ping'; | ||
type: 'pong'; | ||
message_number: number; | ||
}; | ||
export declare type SubscribeMessage = { | ||
type: 'subscribe' | 'unsubscribe' | 'subscribed' | 'unsubscribed'; | ||
type: 'subscribe' | 'unsubscribe'; | ||
topic: string; | ||
@@ -43,8 +43,5 @@ value?: undefined; | ||
configs: Record<string, Record<any, any>>; | ||
filePath: string | null; | ||
type: string | null; | ||
}; | ||
export declare type SocketErrorEvent = { | ||
code?: number; | ||
message?: string; | ||
params?: Record<string, any>; | ||
}; | ||
//# sourceMappingURL=types.d.ts.map |
{ | ||
"name": "@waves.exchange/config-ws", | ||
"version": "0.1.2", | ||
"version": "0.2.0", | ||
"main": "dist/index.js", | ||
@@ -11,3 +11,3 @@ "typings": "dist/index.d.ts", | ||
"dependencies": { | ||
"@waves.exchange/reactive": "^0.1.0" | ||
"@waves.exchange/reactive": "^0.0.1" | ||
}, | ||
@@ -14,0 +14,0 @@ "devDependencies": { |
import { WebSocketSubject, webSocket, BehaviorSubject } from '@waves.exchange/reactive'; | ||
import { ConfigState, PongMessage, ContentUpdate, SubscribeMessage, SocketErrorEvent, CommonUpdate } from './types'; | ||
import { ConfigState, PongMessage, ContentUpdate, SubscribeMessage } from './types'; | ||
const emptyConfigState: ConfigState = { | ||
configs: Object.create(null) | ||
configs: Object.create(null), | ||
filePath: null, | ||
type: null | ||
}; | ||
@@ -11,7 +13,5 @@ | ||
const isEmpty = (object: Record<any, any>) => Object.keys(object).length === 0 && object.constructor === Object | ||
export class WsConfigService { | ||
public configSubject = new BehaviorSubject<ConfigState>(emptyConfigState); | ||
public errorsSubject = new BehaviorSubject<SocketErrorEvent>({}); | ||
@@ -21,11 +21,5 @@ private readonly _url: string; | ||
private _connectConfigTimeout: number | undefined; | ||
private _isOpened = false; | ||
private _initialValueTimers: Record<string, number | undefined> = Object.create(null); | ||
private _timeout: Record<'initialValue' , number>; | ||
private _lastMessage = 0; | ||
constructor(url: string, timeout = { initialValue: 10000 }) { | ||
constructor(url: string) { | ||
this._url = url; | ||
this._timeout = timeout; | ||
this.ensureConfigWebSocket(); | ||
@@ -38,4 +32,2 @@ | ||
window.addEventListener('offline', () => { | ||
this._configWebSocket?.unsubscribe(1000, 'Client is offline'); | ||
this._configWebSocket = undefined; | ||
this.cleanUpConfigWebSocket(); | ||
@@ -45,50 +37,18 @@ }); | ||
public configUpdates(filePath: string): BehaviorSubject<ConfigState> { | ||
const topic = `topic://${filePath}`; | ||
const ws = ( | ||
this._configWebSocket || | ||
this.ensureConfigWebSocket() | ||
) as WebSocketSubject<ContentUpdate | SubscribeMessage>; | ||
this._initialValueTimers[topic] = window.setTimeout(() => { | ||
this.errorsSubject.next({ | ||
code: 3, | ||
message: 'Initial value socket timeout', | ||
params: { | ||
topic | ||
} | ||
}); | ||
}, this._timeout.initialValue); | ||
ws.multiplex( | ||
() => ({ type: 'subscribe', topic }), | ||
() => ({ type: 'unsubscribe', topic }), | ||
(message): message is ContentUpdate => | ||
message.topic === topic && (message.type === 'update' || message.type === 'subscribed'), | ||
).subscribe(data => { | ||
clearTimeout(this._initialValueTimers[topic]); | ||
const oldConfigs = this.configSubject.getValue().configs; | ||
this.configSubject.next({ | ||
configs: { | ||
...oldConfigs, | ||
[filePath]: data?.value | ||
} | ||
}); | ||
}); | ||
return this.configSubject; | ||
} | ||
private ensureConfigWebSocket(): WebSocketSubject<unknown> { | ||
if (!this._configWebSocket) { | ||
window.clearTimeout(this._connectConfigTimeout); | ||
this._configWebSocket = webSocket(this._url, { | ||
onOpen: () => { | ||
this._isOpened = true; | ||
if (!isEmpty(this.configSubject.getValue().configs)) { | ||
const oldConfigs = this.configSubject.getValue().configs; | ||
if (oldConfigs) { | ||
Object.keys(oldConfigs).forEach(data => { | ||
this.configUpdates(data); | ||
}); | ||
} | ||
} | ||
}, | ||
onClose: event => { | ||
this.sendAnalytics(event); | ||
this._isOpened = false; | ||
onClose: () => { | ||
this.cleanUpConfigWebSocket(); | ||
@@ -102,5 +62,3 @@ if (window.navigator.onLine) { | ||
}); | ||
this.pingSubscribe(); | ||
this.resubscribeOnTopics(); | ||
this.subscribeOnCheckingMessage(); | ||
this.pingSubscribe(this._configWebSocket as WebSocketSubject<PongMessage>); | ||
} | ||
@@ -113,6 +71,2 @@ | ||
private cleanUpConfigWebSocket(): void { | ||
this._lastMessage = 0; | ||
Object.values(this._initialValueTimers).forEach(timer => { | ||
clearTimeout(timer); | ||
}); | ||
if (this._configWebSocket) { | ||
@@ -124,70 +78,41 @@ this._configWebSocket.unsubscribe(); | ||
private pingSubscribe(): void { | ||
const ws = this._configWebSocket as WebSocketSubject<PongMessage>; | ||
ws.subscribe((message: PongMessage) => { | ||
private pingSubscribe(ws: WebSocketSubject<PongMessage>): void { | ||
ws.subscribe((message: any) => { | ||
if (message.type === 'ping') { | ||
ws.next({ type: 'pong', message_number: message.message_number }); | ||
} | ||
}, (err) => { | ||
const errString = err instanceof CloseEvent ? (err.reason || err.code) : String(err); | ||
console.error('Config WS Ping Error: %s', errString); | ||
}); | ||
} | ||
private sendAnalytics(event: CloseEvent) { | ||
const statParams = { | ||
wasClean: event.wasClean, | ||
eventPhase: event.eventPhase, | ||
timeStamp: event.timeStamp, | ||
readyState: (event.currentTarget as WebSocket).readyState, | ||
code: event.code, | ||
reason: event.reason | ||
}; | ||
if (!this._isOpened) { | ||
this.errorsSubject.next({ | ||
code: 1, | ||
message: 'Failed to connect', | ||
params: statParams | ||
}); | ||
} | ||
if (this._isOpened && !event.wasClean) { | ||
this.errorsSubject.next({ | ||
code: 2, | ||
message: 'Connection closed dirty', | ||
params: { | ||
error: statParams | ||
} | ||
}); | ||
} | ||
} | ||
public configUpdates(filePath: string): BehaviorSubject<ConfigState> { | ||
const topic = `topic://${filePath}`; | ||
private resubscribeOnTopics() { | ||
if (!isEmpty(this.configSubject.getValue().configs)) { | ||
const ws = ( | ||
this._configWebSocket || | ||
this.ensureConfigWebSocket() | ||
) as WebSocketSubject<ContentUpdate | SubscribeMessage>; | ||
ws.multiplex( | ||
() => ({ type: 'subscribe', topic }), | ||
() => ({ type: 'unsubscribe', topic }), | ||
(message): message is ContentUpdate => message.topic === topic && message.type === 'update', | ||
).subscribe(data => { | ||
const oldConfigs = this.configSubject.getValue().configs; | ||
if (oldConfigs) { | ||
Object.keys(oldConfigs).forEach(data => { | ||
this.configUpdates(data); | ||
}); | ||
} | ||
} | ||
} | ||
this.configSubject.next({ | ||
configs: { | ||
...oldConfigs, | ||
[filePath]: data?.value | ||
}, | ||
filePath: data.topic, | ||
type: data.type | ||
}); | ||
}); | ||
private subscribeOnCheckingMessage() { | ||
const ws = this._configWebSocket as WebSocketSubject<CommonUpdate>; | ||
ws.subscribe((message) => { | ||
if (this._lastMessage + 1 !== message.message_number) { | ||
this.errorsSubject.next({ | ||
code: 4, | ||
message: | ||
'Wrong message number', | ||
params: { | ||
message_number: message.message_number, | ||
mustBe: this._lastMessage + 1 | ||
} | ||
}); | ||
this._configWebSocket?.unsubscribe(1000, 'Wrong message number'); | ||
this._configWebSocket = undefined; | ||
return; | ||
} | ||
this._lastMessage = message.message_number; | ||
}); | ||
return this.configSubject; | ||
} | ||
} |
@@ -39,3 +39,3 @@ | ||
export type PongMessage = { | ||
type: 'pong' | 'ping'; | ||
type: 'pong'; | ||
message_number: number; | ||
@@ -45,3 +45,3 @@ }; | ||
export type SubscribeMessage = { | ||
type: 'subscribe' | 'unsubscribe' | 'subscribed' | 'unsubscribed'; | ||
type: 'subscribe' | 'unsubscribe'; | ||
topic: string; | ||
@@ -52,8 +52,2 @@ value?: undefined; | ||
export type ConfigState = { configs: Record<string, Record<any, any>> }; | ||
export type SocketErrorEvent = { | ||
code?: number; | ||
message?: string; | ||
params?: Record<string, any>; | ||
} | ||
export type ConfigState = { configs: Record<string, Record<any, any>>, filePath: string|null, type: string|null }; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
15669
286
1
+ Added@waves.exchange/reactive@0.0.1(transitive)
- Removed@waves.exchange/reactive@0.1.0(transitive)