@aurox/distributed-observables
Advanced tools
Comparing version 0.0.8 to 0.0.9
@@ -5,8 +5,9 @@ import { TypedEmitter } from 'tiny-typed-emitter'; | ||
private _topicValues; | ||
ready(): Promise<void>; | ||
applyPatch: (request: PatchRequest<Identity>) => Promise<PatchResponse<Identity>>; | ||
requestSnapshot: (request: SnapshotRequest<Identity>) => Promise<SnapshotResponse<Identity>>; | ||
subscribeTopics: (topics: string[]) => void; | ||
unsubscribeTopics: (topics: string[]) => void; | ||
subscribeTopics: (topics: string[]) => Promise<void>; | ||
unsubscribeTopics: (topics: string[]) => Promise<void>; | ||
setTopicValue(topic: string, revision: number, state: any): void; | ||
close(): void; | ||
} |
@@ -28,3 +28,3 @@ "use strict"; | ||
}; | ||
this.subscribeTopics = (topics) => { | ||
this.subscribeTopics = async (topics) => { | ||
for (const topic of topics) { | ||
@@ -36,3 +36,3 @@ if (!this._topicValues.has(topic)) { | ||
}; | ||
this.unsubscribeTopics = (topics) => { | ||
this.unsubscribeTopics = async (topics) => { | ||
for (const topic of topics) { | ||
@@ -43,2 +43,5 @@ this._topicValues.delete(topic); | ||
} | ||
ready() { | ||
return Promise.resolve(); | ||
} | ||
setTopicValue(topic, revision, state) { | ||
@@ -45,0 +48,0 @@ this._topicValues.set(topic, { revision, state }); |
import { TypedEmitter } from 'tiny-typed-emitter'; | ||
import { ObservableState } from '../types'; | ||
import { ObservableManagerAdaptor } from '../ObservableManager'; | ||
export declare type ObservableStatus = 'pending' | 'initializing' | 'resetting' | 'ready' | 'invalid'; | ||
export interface ObservableEvents<T> { | ||
'setup': (initial: boolean) => void; | ||
'changing': (oldValue: T | null, newValue: T) => void; | ||
@@ -23,5 +25,4 @@ 'changed': (newValue: T) => void; | ||
private _adaptor; | ||
private _initialized; | ||
private _initialStatePromise; | ||
private _invalid; | ||
private _status; | ||
private _stateSnapshotRequestPromise; | ||
private _invalidReason; | ||
@@ -34,2 +35,3 @@ private _patchesInQueue; | ||
get description(): string; | ||
get status(): ObservableStatus; | ||
get initialized(): boolean; | ||
@@ -39,4 +41,5 @@ get invalid(): boolean; | ||
constructor(adapter: ObservableManagerAdaptor, topic: string, options: ObservableOptions); | ||
private requestInitialState; | ||
private requestStateSnapshot; | ||
initialize(): Promise<void>; | ||
reset(): Promise<void>; | ||
ready(): Promise<void>; | ||
@@ -43,0 +46,0 @@ getStateSync(): T | null; |
@@ -14,5 +14,4 @@ "use strict"; | ||
this._revision = -1; | ||
this._initialized = false; | ||
this._initialStatePromise = null; | ||
this._invalid = false; | ||
this._status = 'pending'; | ||
this._stateSnapshotRequestPromise = null; | ||
this._invalidReason = null; | ||
@@ -27,3 +26,8 @@ this._patchesInQueue = []; | ||
clearTimeout(this._persistRetryTimer); | ||
// wait for any on going init/reset requests before applying the persist request | ||
await this._stateSnapshotRequestPromise; | ||
const patches = this._patchesInQueue; | ||
if (patches.length === 0) { | ||
return; | ||
} | ||
this._patchesInQueue = []; | ||
@@ -33,3 +37,3 @@ try { | ||
const result = await this._applyPatchPromise; | ||
if (this._invalid) { | ||
if (this._status === 'invalid') { | ||
return; | ||
@@ -58,3 +62,3 @@ } | ||
catch (error) { | ||
if (this._invalid) { | ||
if (this._status === 'invalid') { | ||
return; | ||
@@ -84,7 +88,10 @@ } | ||
} | ||
get status() { | ||
return this._status; | ||
} | ||
get initialized() { | ||
return this._initialized; | ||
return this._status !== 'pending' && this._status !== 'initializing'; | ||
} | ||
get invalid() { | ||
return this._invalid; | ||
return this._status === 'invalid'; | ||
} | ||
@@ -94,10 +101,10 @@ get invalidReason() { | ||
} | ||
async requestInitialState() { | ||
async requestStateSnapshot() { | ||
var _a, _b; | ||
try { | ||
this._status = 'initializing'; | ||
const result = await this._adaptor.requestSnapshot(); | ||
this._initialized = true; | ||
if (result.status === 'error') { | ||
console.error(result.message); | ||
this._invalid = true; | ||
this._status = 'invalid'; | ||
this._invalidReason = result.message; | ||
@@ -109,2 +116,3 @@ this.emit('error', result.message); | ||
this._currentState = result.state; | ||
this._status = 'ready'; | ||
return this._currentState; | ||
@@ -114,3 +122,3 @@ } | ||
console.error(error); | ||
this._invalid = true; | ||
this._status = 'invalid'; | ||
this._invalidReason = (_a = error === null || error === void 0 ? void 0 : error.message) !== null && _a !== void 0 ? _a : 'Unknown Reason'; | ||
@@ -122,13 +130,24 @@ this.emit('error', (_b = error === null || error === void 0 ? void 0 : error.message) !== null && _b !== void 0 ? _b : 'Unknown Error'); | ||
async initialize() { | ||
if (this._initialized || this._initialStatePromise) { | ||
if (this.initialized || this._stateSnapshotRequestPromise) { | ||
return; | ||
} | ||
this._initialStatePromise = this.requestInitialState(); | ||
await this._initialStatePromise; | ||
this._stateSnapshotRequestPromise = this.requestStateSnapshot(); | ||
await this._stateSnapshotRequestPromise; | ||
this.emit('setup', true); | ||
} | ||
async reset() { | ||
// make sure there is no other pending init or resets pending | ||
await this._stateSnapshotRequestPromise; | ||
this._status = 'resetting'; | ||
this._stateSnapshotRequestPromise = this.requestStateSnapshot(); | ||
await this._stateSnapshotRequestPromise; | ||
this._status = 'ready'; | ||
this._invalidReason = null; | ||
this.emit('setup', false); | ||
} | ||
async ready() { | ||
await this._initialStatePromise; | ||
await this._stateSnapshotRequestPromise; | ||
} | ||
getStateSync() { | ||
if (!this._initialized) { | ||
if (!this.initialized) { | ||
throw new Error('Observable is not initialized'); | ||
@@ -139,6 +158,6 @@ } | ||
async getStateAsync() { | ||
if (this._initialized) { | ||
if (this.initialized) { | ||
return this._currentState; | ||
} | ||
return this._initialStatePromise; | ||
return this._stateSnapshotRequestPromise; | ||
} | ||
@@ -155,3 +174,3 @@ async setValue(value) { | ||
update(recipe) { | ||
if (!this._initialized) { | ||
if (!this.initialized) { | ||
throw new Error('Observable is not initialized'); | ||
@@ -158,0 +177,0 @@ } |
@@ -13,7 +13,9 @@ import { Patch } from 'immer'; | ||
export declare class ObservableManager { | ||
private _observers; | ||
private _observables; | ||
private _options; | ||
private _channel; | ||
get ready(): false | (() => Promise<void>); | ||
constructor(options: ObservableManagerOptions); | ||
setChannelClient(channel: ChannelClient): void; | ||
connectChannelClient(channel: ChannelClient): Promise<void>; | ||
private handleSetup; | ||
getObserver<T extends ObservableState>(topic: string, options?: Partial<ObservableOptions>): Observable<T>; | ||
@@ -20,0 +22,0 @@ private requestSnapshot; |
@@ -8,12 +8,29 @@ "use strict"; | ||
constructor(options) { | ||
this._observers = new Map(); | ||
this._observables = new Map(); | ||
this._channel = null; | ||
this.handleSetup = () => { | ||
for (const observable of this._observables.values()) { | ||
if (observable.status !== 'pending') { | ||
observable.reset(); | ||
} | ||
} | ||
}; | ||
this._options = options; | ||
} | ||
setChannelClient(channel) { | ||
get ready() { | ||
var _a, _b; | ||
return (_b = (_a = this._channel) === null || _a === void 0 ? void 0 : _a.ready) !== null && _b !== void 0 ? _b : false; | ||
} | ||
async connectChannelClient(channel) { | ||
if (this._channel) { | ||
this._channel.close(); | ||
this._channel.removeListener('setup', this.handleSetup); | ||
} | ||
this._channel = channel; | ||
this._channel.addListener('setup', this.handleSetup); | ||
await this._channel.ready(); | ||
} | ||
getObserver(topic, options) { | ||
var _a, _b; | ||
let observer = this._observers.get(topic); | ||
let observer = this._observables.get(topic); | ||
if (!observer) { | ||
@@ -35,3 +52,3 @@ const adaptor = { | ||
}); | ||
this._observers.set(topic, observer); | ||
this._observables.set(topic, observer); | ||
} | ||
@@ -53,5 +70,6 @@ return observer; | ||
destroy() { | ||
var _a; | ||
var _a, _b; | ||
(_a = this._channel) === null || _a === void 0 ? void 0 : _a.close(); | ||
for (const observable of this._observers.values()) { | ||
(_b = this._channel) === null || _b === void 0 ? void 0 : _b.removeListener('setup', this.handleSetup); | ||
for (const observable of this._observables.values()) { | ||
observable.detach(); | ||
@@ -58,0 +76,0 @@ } |
@@ -5,2 +5,3 @@ import { TypedEmitter } from 'tiny-typed-emitter'; | ||
export interface ChannelClientEvents { | ||
'setup': (initial: boolean) => void; | ||
'patch-broadcast': (broadcast: PatchBroadcast<Identity>) => void; | ||
@@ -10,7 +11,8 @@ 'delete-broadcast': (broadcast: DeleteBroadcast<Identity>) => void; | ||
export interface ChannelClient extends TypedEmitter<ChannelClientEvents> { | ||
ready: () => Promise<void>; | ||
applyPatch: (request: PatchRequest<Identity>) => Promise<PatchResponse<Identity>>; | ||
requestSnapshot: (request: SnapshotRequest<Identity>) => Promise<SnapshotResponse<Identity>>; | ||
subscribeTopics: (topics: string[]) => void; | ||
unsubscribeTopics: (topics: string[]) => void; | ||
subscribeTopics: (topics: string[]) => Promise<void>; | ||
unsubscribeTopics: (topics: string[]) => Promise<void>; | ||
close(): void; | ||
} |
{ | ||
"name": "@aurox/distributed-observables", | ||
"version": "0.0.8", | ||
"version": "0.0.9", | ||
"description": "A set of isomorphic helpers to enable distributed object sharing using the observer pattern", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -18,3 +18,7 @@ import { TypedEmitter } from 'tiny-typed-emitter'; | ||
applyPatch = async (request: PatchRequest<Identity>): Promise<PatchResponse<Identity>> => { | ||
public ready() { | ||
return Promise.resolve(); | ||
} | ||
public applyPatch = async (request: PatchRequest<Identity>): Promise<PatchResponse<Identity>> => { | ||
const value = this._topicValues.get(request.id.topic)!; | ||
@@ -37,3 +41,3 @@ | ||
requestSnapshot = async (request: SnapshotRequest<Identity>): Promise<SnapshotResponse<Identity>> => { | ||
public requestSnapshot = async (request: SnapshotRequest<Identity>): Promise<SnapshotResponse<Identity>> => { | ||
this.subscribeTopics([request.id.topic]); | ||
@@ -46,3 +50,3 @@ | ||
subscribeTopics = (topics: string[]) => { | ||
public subscribeTopics = async (topics: string[]) => { | ||
for (const topic of topics) { | ||
@@ -55,3 +59,3 @@ if (!this._topicValues.has(topic)) { | ||
unsubscribeTopics = (topics: string[]) => { | ||
public unsubscribeTopics = async (topics: string[]) => { | ||
for (const topic of topics) { | ||
@@ -58,0 +62,0 @@ this._topicValues.delete(topic); |
@@ -13,3 +13,3 @@ import { enablePatches } from 'immer'; | ||
manager.setChannelClient(channel); | ||
manager.connectChannelClient(channel); | ||
@@ -16,0 +16,0 @@ return { channel, manager }; |
@@ -8,3 +8,6 @@ import { TypedEmitter } from 'tiny-typed-emitter'; | ||
export type ObservableStatus = 'pending' | 'initializing' | 'resetting' | 'ready' | 'invalid'; | ||
export interface ObservableEvents<T> { | ||
'setup': (initial: boolean) => void; | ||
'changing': (oldValue: T | null, newValue: T) => void; | ||
@@ -33,6 +36,6 @@ 'changed': (newValue: T) => void; | ||
private _initialized = false; | ||
private _initialStatePromise: Promise<T | null> | null = null; | ||
private _status: ObservableStatus = 'pending'; | ||
private _invalid = false; | ||
private _stateSnapshotRequestPromise: Promise<T | null> | null = null; | ||
private _invalidReason: string | null = null; | ||
@@ -55,8 +58,12 @@ | ||
public get status() { | ||
return this._status; | ||
} | ||
public get initialized() { | ||
return this._initialized; | ||
return this._status !== 'pending' && this._status !== 'initializing'; | ||
} | ||
public get invalid() { | ||
return this._invalid; | ||
return this._status === 'invalid'; | ||
} | ||
@@ -82,12 +89,12 @@ | ||
private async requestInitialState(): Promise<T | null> { | ||
private async requestStateSnapshot(): Promise<T | null> { | ||
try { | ||
this._status = 'initializing'; | ||
const result = await this._adaptor.requestSnapshot(); | ||
this._initialized = true; | ||
if (result.status === 'error') { | ||
console.error(result.message); | ||
this._invalid = true; | ||
this._status = 'invalid'; | ||
this._invalidReason = result.message; | ||
@@ -103,2 +110,4 @@ | ||
this._status = 'ready'; | ||
return this._currentState; | ||
@@ -108,3 +117,3 @@ } catch (error) { | ||
this._invalid = true; | ||
this._status = 'invalid'; | ||
this._invalidReason = error?.message ?? 'Unknown Reason'; | ||
@@ -119,17 +128,35 @@ | ||
public async initialize() { | ||
if (this._initialized || this._initialStatePromise) { | ||
if (this.initialized || this._stateSnapshotRequestPromise) { | ||
return; | ||
} | ||
this._initialStatePromise = this.requestInitialState(); | ||
this._stateSnapshotRequestPromise = this.requestStateSnapshot(); | ||
await this._initialStatePromise; | ||
await this._stateSnapshotRequestPromise; | ||
this.emit('setup', true); | ||
} | ||
public async reset() { | ||
// make sure there is no other pending init or resets pending | ||
await this._stateSnapshotRequestPromise; | ||
this._status = 'resetting'; | ||
this._stateSnapshotRequestPromise = this.requestStateSnapshot(); | ||
await this._stateSnapshotRequestPromise; | ||
this._status = 'ready'; | ||
this._invalidReason = null; | ||
this.emit('setup', false); | ||
} | ||
public async ready() { | ||
await this._initialStatePromise; | ||
await this._stateSnapshotRequestPromise; | ||
} | ||
public getStateSync(): T | null { | ||
if (!this._initialized) { | ||
if (!this.initialized) { | ||
throw new Error('Observable is not initialized'); | ||
@@ -142,7 +169,7 @@ } | ||
public async getStateAsync(): Promise<T | null> { | ||
if (this._initialized) { | ||
if (this.initialized) { | ||
return this._currentState; | ||
} | ||
return this._initialStatePromise; | ||
return this._stateSnapshotRequestPromise; | ||
} | ||
@@ -157,3 +184,11 @@ | ||
// wait for any on going init/reset requests before applying the persist request | ||
await this._stateSnapshotRequestPromise; | ||
const patches = this._patchesInQueue; | ||
if (patches.length === 0) { | ||
return; | ||
} | ||
this._patchesInQueue = []; | ||
@@ -166,3 +201,3 @@ | ||
if (this._invalid) { | ||
if (this._status === 'invalid') { | ||
return; | ||
@@ -206,3 +241,3 @@ } | ||
} catch (error) { | ||
if (this._invalid) { | ||
if (this._status === 'invalid') { | ||
return; | ||
@@ -238,3 +273,3 @@ } | ||
public update(recipe: (draft: T) => void | T) { | ||
if (!this._initialized) { | ||
if (!this.initialized) { | ||
throw new Error('Observable is not initialized'); | ||
@@ -241,0 +276,0 @@ } |
@@ -18,3 +18,3 @@ import { v4 as uuidV4 } from 'uuid'; | ||
export class ObservableManager { | ||
private _observers = new Map<string, Observable<any>>(); | ||
private _observables = new Map<string, Observable<any>>(); | ||
@@ -24,2 +24,6 @@ private _options: ObservableManagerOptions; | ||
public get ready() { | ||
return this._channel?.ready ?? false; | ||
} | ||
constructor(options: ObservableManagerOptions) { | ||
@@ -29,8 +33,26 @@ this._options = options; | ||
public setChannelClient(channel: ChannelClient) { | ||
public async connectChannelClient(channel: ChannelClient) { | ||
if (this._channel) { | ||
this._channel.close(); | ||
this._channel.removeListener('setup', this.handleSetup); | ||
} | ||
this._channel = channel; | ||
this._channel.addListener('setup', this.handleSetup); | ||
await this._channel.ready(); | ||
} | ||
private handleSetup = () => { | ||
for (const observable of this._observables.values()) { | ||
if (observable.status !== 'pending') { | ||
observable.reset(); | ||
} | ||
} | ||
}; | ||
public getObserver<T extends ObservableState>(topic: string, options?: Partial<ObservableOptions>): Observable<T> { | ||
let observer = this._observers.get(topic); | ||
let observer = this._observables.get(topic); | ||
@@ -57,3 +79,3 @@ if (!observer) { | ||
this._observers.set(topic, observer); | ||
this._observables.set(topic, observer); | ||
} | ||
@@ -83,3 +105,5 @@ | ||
for (const observable of this._observers.values()) { | ||
this._channel?.removeListener('setup', this.handleSetup); | ||
for (const observable of this._observables.values()) { | ||
observable.detach(); | ||
@@ -86,0 +110,0 @@ } |
@@ -7,2 +7,3 @@ import { TypedEmitter } from 'tiny-typed-emitter'; | ||
export interface ChannelClientEvents { | ||
'setup': (initial: boolean) => void; | ||
'patch-broadcast': (broadcast: PatchBroadcast<Identity>) => void; | ||
@@ -13,7 +14,8 @@ 'delete-broadcast': (broadcast: DeleteBroadcast<Identity>) => void; | ||
export interface ChannelClient extends TypedEmitter<ChannelClientEvents> { | ||
ready: () => Promise<void>; | ||
applyPatch: (request: PatchRequest<Identity>) => Promise<PatchResponse<Identity>>; | ||
requestSnapshot: (request: SnapshotRequest<Identity>) => Promise<SnapshotResponse<Identity>>; | ||
subscribeTopics: (topics: string[]) => void; | ||
unsubscribeTopics: (topics: string[]) => void; | ||
subscribeTopics: (topics: string[]) => Promise<void>; | ||
unsubscribeTopics: (topics: string[]) => Promise<void>; | ||
close(): void; | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
89881
1638