@maybe-remote/rxjs
Advanced tools
Comparing version 0.0.2 to 0.0.3
@@ -0,1 +1,5 @@ | ||
## 0.0.3 (2024-10-14) | ||
This was a version bump only for rxjs to align it with other projects, there were no code changes. | ||
## 0.0.2 (2024-10-14) | ||
@@ -2,0 +6,0 @@ |
{ | ||
"name": "@maybe-remote/rxjs", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"dependencies": { | ||
@@ -5,0 +5,0 @@ "tslib": "^2.3.0" |
import { | ||
ClientPlugin, | ||
ServiceConnection, | ||
@@ -62,51 +63,48 @@ ServiceDefinition, | ||
export function rxjsServicePlugin(): ServicePlugin { | ||
const observables = new Map<string, Observable<any>>(); | ||
const observableSubscriptionIds = new Map<string, Set<string>>(); | ||
const subscriptionObservableIds = new Map<string, string>(); | ||
const subscriptions = new Map<string, Subscription>(); | ||
export class RxJSServicePlugin implements ServicePlugin { | ||
private observables = new Map<string, Observable<any>>(); | ||
private observableSubscriptionIds = new Map<string, Set<string>>(); | ||
private subscriptionObservableIds = new Map<string, string>(); | ||
private subscriptions = new Map<string, Subscription>(); | ||
const trackSubscription = ( | ||
private trackSubscription( | ||
observableId: string, | ||
subscriptionId: string, | ||
subscription: Subscription | ||
) => { | ||
) { | ||
const subscriptionIds = | ||
observableSubscriptionIds.get(observableId) ?? new Set(); | ||
this.observableSubscriptionIds.get(observableId) ?? new Set(); | ||
subscriptionIds.add(subscriptionId); | ||
observableSubscriptionIds.set(observableId, subscriptionIds); | ||
subscriptions.set(subscriptionId, subscription); | ||
}; | ||
this.observableSubscriptionIds.set(observableId, subscriptionIds); | ||
this.subscriptions.set(subscriptionId, subscription); | ||
} | ||
const untrackSubscription = (subscriptionId: string) => { | ||
const observableId = subscriptionObservableIds.get(subscriptionId); | ||
private untrackSubscription(subscriptionId: string) { | ||
const observableId = this.subscriptionObservableIds.get(subscriptionId); | ||
if (observableId) { | ||
subscriptionObservableIds.delete(subscriptionId); | ||
const subscriptionIds = observableSubscriptionIds.get(observableId); | ||
this.subscriptionObservableIds.delete(subscriptionId); | ||
const subscriptionIds = this.observableSubscriptionIds.get(observableId); | ||
if (subscriptionIds) { | ||
subscriptionIds.delete(subscriptionId); | ||
if (subscriptionIds.size === 0) { | ||
observableSubscriptionIds.delete(observableId); | ||
this.observableSubscriptionIds.delete(observableId); | ||
} | ||
} | ||
} | ||
subscriptions.delete(subscriptionId); | ||
}; | ||
this.subscriptions.delete(subscriptionId); | ||
} | ||
const trackObservable = ( | ||
observableId: string, | ||
observable: Observable<any> | ||
) => { | ||
observables.set(observableId, observable); | ||
}; | ||
private trackObservable(observableId: string, observable: Observable<any>) { | ||
this.observables.set(observableId, observable); | ||
} | ||
const untrackObservable = (observableId: string) => { | ||
observables.delete(observableId); | ||
}; | ||
private untrackObservable(observableId: string) { | ||
this.observables.delete(observableId); | ||
} | ||
return (( | ||
handleMessage( | ||
message: RxJSGetObservable | RxJSSubscribe | RxJSUnsubscribe | RxJSGC, | ||
connection: ServiceConnection, | ||
serviceDefintion: ServiceDefinition | ||
): boolean => { | ||
): boolean { | ||
switch (message.type) { | ||
@@ -130,3 +128,3 @@ case 'rxjs-get': { | ||
trackObservable(observableId, observable); | ||
this.trackObservable(observableId, observable); | ||
@@ -138,3 +136,3 @@ return true; | ||
const { observableId, subscriptionId } = message.payload; | ||
const observable = observables.get(observableId); | ||
const observable = this.observables.get(observableId); | ||
@@ -146,3 +144,3 @@ if (!observable) { | ||
const subscription = observable.subscribe({ | ||
next(value) { | ||
next: (value) => { | ||
connection.send({ | ||
@@ -156,6 +154,8 @@ type: 'rxjs-next', | ||
}, | ||
error(error) { | ||
observableSubscriptionIds.get(observableId)?.delete(subscriptionId); | ||
subscriptions.delete(subscriptionId); | ||
subscriptionObservableIds.delete(subscriptionId); | ||
error: (error) => { | ||
this.observableSubscriptionIds | ||
.get(observableId) | ||
?.delete(subscriptionId); | ||
this.subscriptions.delete(subscriptionId); | ||
this.subscriptionObservableIds.delete(subscriptionId); | ||
connection.send({ | ||
@@ -169,6 +169,8 @@ type: 'rxjs-error', | ||
}, | ||
complete() { | ||
observableSubscriptionIds.get(observableId)?.delete(subscriptionId); | ||
subscriptions.delete(subscriptionId); | ||
subscriptionObservableIds.delete(subscriptionId); | ||
complete: () => { | ||
this.observableSubscriptionIds | ||
.get(observableId) | ||
?.delete(subscriptionId); | ||
this.subscriptions.delete(subscriptionId); | ||
this.subscriptionObservableIds.delete(subscriptionId); | ||
connection.send({ | ||
@@ -183,3 +185,3 @@ type: 'rxjs-complete', | ||
trackSubscription(observableId, subscriptionId, subscription); | ||
this.trackSubscription(observableId, subscriptionId, subscription); | ||
@@ -191,3 +193,3 @@ return true; | ||
const { subscriptionId } = message.payload; | ||
const subscription = subscriptions.get(subscriptionId); | ||
const subscription = this.subscriptions.get(subscriptionId); | ||
@@ -198,3 +200,3 @@ if (!subscription) { | ||
untrackSubscription(subscriptionId); | ||
this.untrackSubscription(subscriptionId); | ||
@@ -207,3 +209,5 @@ subscription.unsubscribe(); | ||
const { observableIds } = message.payload; | ||
observableIds.forEach(untrackObservable); | ||
for (const observableId of observableIds) { | ||
this.untrackObservable(observableId); | ||
} | ||
return true; | ||
@@ -214,13 +218,15 @@ } | ||
return false; | ||
}) as any; | ||
} | ||
} | ||
export function rxjsClientPlugin() { | ||
const refs = new Map<string, WeakRef<Observable<any>>>(); | ||
export const rxjsServicePlugin = new RxJSServicePlugin(); | ||
const cleanRefs = (connection: ServiceConnection) => { | ||
export class RxJSClientPlugin implements ClientPlugin { | ||
private refs = new Map<string, WeakRef<Observable<any>>>(); | ||
private cleanRefs(connection: ServiceConnection) { | ||
let observableIds: string[] | null = null; | ||
for (const [subscriptionId, ref] of refs) { | ||
for (const [subscriptionId, ref] of this.refs) { | ||
if (!ref.deref()) { | ||
refs.delete(subscriptionId); | ||
this.refs.delete(subscriptionId); | ||
observableIds ??= []; | ||
@@ -239,16 +245,16 @@ observableIds.push(subscriptionId); | ||
} | ||
}; | ||
} | ||
let scheduledCleanRefs = 0; | ||
const scheduleCleanRefs = (connection: ServiceConnection) => { | ||
if (scheduledCleanRefs !== 0 || refs.size === 0) return; | ||
private scheduledCleanRefs = 0; | ||
private scheduleCleanRefs(connection: ServiceConnection) { | ||
if (this.scheduledCleanRefs !== 0 || this.refs.size === 0) return; | ||
scheduledCleanRefs = requestIdleCallback(() => { | ||
scheduledCleanRefs = 0; | ||
cleanRefs(connection); | ||
this.scheduledCleanRefs = requestIdleCallback(() => { | ||
this.scheduledCleanRefs = 0; | ||
this.cleanRefs(connection); | ||
}); | ||
}; | ||
} | ||
return (method: string, connection: ServiceConnection) => { | ||
scheduleCleanRefs(connection); | ||
findHandler(method: string, connection: ServiceConnection) { | ||
this.scheduleCleanRefs(connection); | ||
@@ -310,3 +316,3 @@ if (method.startsWith('stream')) { | ||
refs.set(subscriptionId, new WeakRef(observable)); | ||
this.refs.set(subscriptionId, new WeakRef(observable)); | ||
@@ -318,3 +324,5 @@ return observable; | ||
return; | ||
}; | ||
} | ||
} | ||
export const rxjsClientPlugin = new RxJSClientPlugin(); |
12183
390