@push-rpc/next
Advanced tools
Comparing version 2.0.3 to 2.0.4
export declare class RemoteSubscriptions { | ||
subscribe(initialData: unknown, itemName: string, parameters: unknown[], consumer: (d: unknown) => void): void; | ||
unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean; | ||
private addSubscription; | ||
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): void; | ||
private removeSubscription; | ||
@@ -6,0 +5,0 @@ getCached(itemName: string, parameters: unknown[]): unknown | undefined; |
@@ -9,6 +9,2 @@ "use strict"; | ||
} | ||
subscribe(initialData, itemName, parameters, consumer) { | ||
this.addSubscription(itemName, parameters, consumer); | ||
this.consume(itemName, parameters, initialData); | ||
} | ||
unsubscribe(itemName, parameters, consumer) { | ||
@@ -15,0 +11,0 @@ const parametersKey = getParametersKey(parameters); |
@@ -23,5 +23,13 @@ "use strict"; | ||
} | ||
// Probably needs to be awaited, b/c server needs the connection to be established before making a subscription | ||
void this.connection.connect(); | ||
const data = await this.invoke(itemName, rpc_js_1.InvocationType.Subscribe, (...parameters) => this.httpClient.subscribe(itemName, parameters, callOptions?.timeout ?? this.options.callTimeout), parameters); | ||
this.remoteSubscriptions.subscribe(data, itemName, parameters, consumer); | ||
try { | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer); | ||
const data = await this.invoke(itemName, rpc_js_1.InvocationType.Subscribe, (...parameters) => this.httpClient.subscribe(itemName, parameters, callOptions?.timeout ?? this.options.callTimeout), parameters); | ||
this.remoteSubscriptions.consume(itemName, parameters, data); | ||
} | ||
catch (e) { | ||
await this.unsubscribe(itemName, parameters, consumer); | ||
throw e; | ||
} | ||
}; | ||
@@ -28,0 +36,0 @@ this.unsubscribe = async (itemName, parameters, consumer, callOptions) => { |
@@ -65,4 +65,3 @@ "use strict"; | ||
try { | ||
const lastData = await this.invokeLocalFunction(connectionContext, itemName, item, parameters, rpc_js_1.InvocationType.Subscribe); | ||
let lastDataJson = (0, json_js_1.safeStringify)(lastData); | ||
let lastDataJson = ""; | ||
const update = this.localSubscriptions.throttled(itemName, async (suppliedData) => { | ||
@@ -84,5 +83,8 @@ try { | ||
this.localSubscriptions.subscribe(connectionContext.clientId, itemName, parameters, update); | ||
const lastData = await this.invokeLocalFunction(connectionContext, itemName, item, parameters, rpc_js_1.InvocationType.Subscribe); | ||
lastDataJson = (0, json_js_1.safeStringify)(lastData); | ||
return lastData; | ||
} | ||
catch (e) { | ||
this.localSubscriptions.unsubscribe(connectionContext.clientId, itemName, parameters); | ||
logger_js_1.log.error(`Failed to subscribe ${itemName}`, e); | ||
@@ -89,0 +91,0 @@ throw e; |
{ | ||
"name": "@push-rpc/next", | ||
"version": "2.0.3", | ||
"version": "2.0.4", | ||
"main": "dist/index.js", | ||
@@ -5,0 +5,0 @@ "types": "dist/index.d.ts", |
import {safeStringify} from "../utils/json.js" | ||
export class RemoteSubscriptions { | ||
subscribe( | ||
initialData: unknown, | ||
itemName: string, | ||
parameters: unknown[], | ||
consumer: (d: unknown) => void | ||
) { | ||
this.addSubscription(itemName, parameters, consumer) | ||
this.consume(itemName, parameters, initialData) | ||
} | ||
unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean { | ||
@@ -21,3 +10,3 @@ const parametersKey = getParametersKey(parameters) | ||
private addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void) { | ||
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void) { | ||
const itemSubscriptions = this.byItem.get(itemName) || {byParameters: new Map()} | ||
@@ -24,0 +13,0 @@ this.byItem.set(itemName, itemSubscriptions) |
@@ -116,16 +116,25 @@ import {CallOptions, InvocationType, RpcContext, Services} from "../rpc.js" | ||
// Probably needs to be awaited, b/c server needs the connection to be established before making a subscription | ||
void this.connection.connect() | ||
const data = await this.invoke( | ||
itemName, | ||
InvocationType.Subscribe, | ||
(...parameters) => | ||
this.httpClient.subscribe( | ||
itemName, | ||
parameters, | ||
callOptions?.timeout ?? this.options.callTimeout | ||
), | ||
parameters | ||
) | ||
this.remoteSubscriptions.subscribe(data, itemName, parameters, consumer) | ||
try { | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer) | ||
const data = await this.invoke( | ||
itemName, | ||
InvocationType.Subscribe, | ||
(...parameters) => | ||
this.httpClient.subscribe( | ||
itemName, | ||
parameters, | ||
callOptions?.timeout ?? this.options.callTimeout | ||
), | ||
parameters | ||
) | ||
this.remoteSubscriptions.consume(itemName, parameters, data) | ||
} catch (e) { | ||
await this.unsubscribe(itemName, parameters, consumer) | ||
throw e | ||
} | ||
} | ||
@@ -132,0 +141,0 @@ |
@@ -154,10 +154,3 @@ import {PublishServicesOptions, RpcServer} from "./index.js" | ||
try { | ||
const lastData = await this.invokeLocalFunction( | ||
connectionContext, | ||
itemName, | ||
item, | ||
parameters, | ||
InvocationType.Subscribe | ||
) | ||
let lastDataJson = safeStringify(lastData) | ||
let lastDataJson: string = "" | ||
@@ -195,4 +188,15 @@ const update = this.localSubscriptions.throttled(itemName, async (suppliedData?: unknown) => { | ||
const lastData = await this.invokeLocalFunction( | ||
connectionContext, | ||
itemName, | ||
item, | ||
parameters, | ||
InvocationType.Subscribe | ||
) | ||
lastDataJson = safeStringify(lastData) | ||
return lastData | ||
} catch (e) { | ||
this.localSubscriptions.unsubscribe(connectionContext.clientId, itemName, parameters) | ||
log.error(`Failed to subscribe ${itemName}`, e) | ||
@@ -199,0 +203,0 @@ throw e |
@@ -397,3 +397,3 @@ import {assert} from "chai" | ||
it.skip("unsubscribe before supply bug", async () => { | ||
it("unsubscribe before supply bug", async () => { | ||
const services = await startTestServer({ | ||
@@ -415,6 +415,6 @@ item: async () => { | ||
await adelay(20) | ||
await adelay(30) | ||
assert.equal(0, testClient!._allSubscriptions().length) | ||
assert.equal(0, testServer!._allSubscriptions().length) | ||
assert.equal(testClient!._allSubscriptions().length, 0) | ||
assert.equal(testServer!._allSubscriptions().length, 0) | ||
}) | ||
@@ -474,5 +474,5 @@ | ||
assert.equal(0, testClient!._allSubscriptions().length) | ||
assert.equal(0, testServer!._allSubscriptions().length) | ||
assert.equal(testClient!._allSubscriptions().length, 0) | ||
assert.equal(testServer!._allSubscriptions().length, 0) | ||
}).timeout(5000) | ||
}) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
196852
4378