@push-rpc/next
Advanced tools
Comparing version 2.0.17 to 2.0.18
{ | ||
"printWidth": 100, | ||
"semi": false, | ||
"trailingComma": "es5", | ||
"trailingComma": "all", | ||
"bracketSpacing": false, | ||
"htmlWhitespaceSensitivity": "ignore" | ||
} |
@@ -6,2 +6,3 @@ import { ClientCache } from "./index"; | ||
unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean; | ||
/** Add subscription in pending state */ | ||
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): void; | ||
@@ -15,2 +16,3 @@ pause(itemName: string, parameters: unknown[]): void; | ||
consume(itemName: string, parameters: unknown[], data: unknown): void; | ||
getConsumerSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): ConsumerSubscription | undefined; | ||
getAllSubscriptions(): Array<[ | ||
@@ -24,1 +26,6 @@ itemName: string, | ||
} | ||
type ConsumerSubscription = { | ||
consumer: (d: unknown) => void; | ||
completed: boolean; | ||
}; | ||
export {}; |
@@ -14,2 +14,3 @@ "use strict"; | ||
} | ||
/** Add subscription in pending state */ | ||
addSubscription(itemName, parameters, consumer) { | ||
@@ -26,3 +27,6 @@ const itemSubscriptions = this.byItem.get(itemName) || { byParameters: new Map() }; | ||
itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions); | ||
parameterSubscriptions.consumers.push(consumer); | ||
parameterSubscriptions.consumers.push({ | ||
consumer, | ||
completed: false, | ||
}); | ||
} | ||
@@ -50,3 +54,5 @@ pause(itemName, parameters) { | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data); | ||
if (consumer.completed) { | ||
consumer.consumer(data); | ||
} | ||
}); | ||
@@ -69,3 +75,3 @@ }); | ||
return false; | ||
const index = filterSubscriptions.consumers.indexOf(consumer); | ||
const index = filterSubscriptions.consumers.findIndex((c) => c.consumer == consumer); | ||
if (index == -1) | ||
@@ -101,6 +107,12 @@ return false; | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data); | ||
if (consumer.completed) { | ||
consumer.consumer(data); | ||
} | ||
}); | ||
} | ||
} | ||
getConsumerSubscription(itemName, parameters, consumer) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters); | ||
return (filterSubscriptions?.consumers || []).find((c) => c.consumer == consumer); | ||
} | ||
getAllSubscriptions() { | ||
@@ -110,3 +122,8 @@ const result = []; | ||
for (const [, parameterSubscriptions] of itemSubscriptions.byParameters) { | ||
result.push([itemName, parameterSubscriptions.parameters, parameterSubscriptions.consumers]); | ||
const consumers = parameterSubscriptions.consumers | ||
.filter((c) => c.completed) | ||
.map((c) => c.consumer); | ||
if (consumers.length) { | ||
result.push([itemName, parameterSubscriptions.parameters, consumers]); | ||
} | ||
} | ||
@@ -113,0 +130,0 @@ } |
@@ -23,2 +23,4 @@ "use strict"; | ||
} | ||
// add subscription in pending state to test later if it was unsubscribed during connection wait | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer); | ||
// Needs to be awaited b/c resubscribe will make a 2nd request then. | ||
@@ -28,3 +30,8 @@ // Also, server needs the connection to be established before making a subscription | ||
try { | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer); | ||
// check if already unsubscribed | ||
const sub = this.remoteSubscriptions.getConsumerSubscription(itemName, parameters, consumer); | ||
if (!sub) | ||
return; | ||
// mark as completed - will resubscribe on reconnects | ||
sub.completed = true; | ||
this.remoteSubscriptions.pause(itemName, parameters); | ||
@@ -31,0 +38,0 @@ const data = await this.invoke(itemName, rpc_js_1.InvocationType.Subscribe, (...parameters) => this.httpClient.subscribe(itemName, parameters, callOptions?.timeout ?? this.options.callTimeout), parameters); |
{ | ||
"name": "@push-rpc/next", | ||
"version": "2.0.17", | ||
"version": "2.0.18", | ||
"main": "dist/index.js", | ||
@@ -5,0 +5,0 @@ "types": "dist/index.d.ts", |
@@ -6,4 +6,3 @@ import {safeStringify} from "../utils/json.js" | ||
export class RemoteSubscriptions { | ||
constructor(private cache: ClientCache | null) { | ||
} | ||
constructor(private cache: ClientCache | null) {} | ||
@@ -16,2 +15,3 @@ unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean { | ||
/** Add subscription in pending state */ | ||
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void) { | ||
@@ -32,3 +32,6 @@ const itemSubscriptions = this.byItem.get(itemName) || {byParameters: new Map()} | ||
itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions) | ||
parameterSubscriptions.consumers.push(consumer) | ||
parameterSubscriptions.consumers.push({ | ||
consumer, | ||
completed: false, | ||
}) | ||
} | ||
@@ -58,3 +61,5 @@ | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data) | ||
if (consumer.completed) { | ||
consumer.consumer(data) | ||
} | ||
}) | ||
@@ -84,3 +89,3 @@ }) | ||
const index = filterSubscriptions.consumers.indexOf(consumer) | ||
const index = filterSubscriptions.consumers.findIndex((c) => c.consumer == consumer) | ||
if (index == -1) return false | ||
@@ -120,3 +125,5 @@ | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data) | ||
if (consumer.completed) { | ||
consumer.consumer(data) | ||
} | ||
}) | ||
@@ -126,2 +133,11 @@ } | ||
getConsumerSubscription( | ||
itemName: string, | ||
parameters: unknown[], | ||
consumer: (d: unknown) => void, | ||
): ConsumerSubscription | undefined { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters) | ||
return (filterSubscriptions?.consumers || []).find((c) => c.consumer == consumer) | ||
} | ||
getAllSubscriptions(): Array< | ||
@@ -134,3 +150,9 @@ [itemName: string, parameters: unknown[], consumers: Array<(d: unknown) => void>] | ||
for (const [, parameterSubscriptions] of itemSubscriptions.byParameters) { | ||
result.push([itemName, parameterSubscriptions.parameters, parameterSubscriptions.consumers]) | ||
const consumers = parameterSubscriptions.consumers | ||
.filter((c) => c.completed) | ||
.map((c) => c.consumer) | ||
if (consumers.length) { | ||
result.push([itemName, parameterSubscriptions.parameters, consumers]) | ||
} | ||
} | ||
@@ -167,3 +189,3 @@ } | ||
cached: unknown | ||
consumers: Array<(d: unknown) => void> | ||
consumers: ConsumerSubscription[] | ||
@@ -174,4 +196,9 @@ paused: boolean | ||
type ConsumerSubscription = { | ||
consumer: (d: unknown) => void | ||
completed: boolean | ||
} | ||
function getParametersKey(parameters: unknown[]) { | ||
return safeStringify(parameters) | ||
} |
@@ -8,3 +8,3 @@ import {CallOptions, InvocationType, RpcContext, Services} from "../rpc.js" | ||
import {ConsumeServicesOptions, RpcClient} from "./index.js" | ||
import {Middleware, withMiddlewares} from "../utils/middleware.js" | ||
import {withMiddlewares} from "../utils/middleware.js" | ||
@@ -132,2 +132,5 @@ export class RpcClientImpl<S extends Services<S>> implements RpcClient { | ||
// add subscription in pending state to test later if it was unsubscribed during connection wait | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer) | ||
// Needs to be awaited b/c resubscribe will make a 2nd request then. | ||
@@ -138,3 +141,9 @@ // Also, server needs the connection to be established before making a subscription | ||
try { | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer) | ||
// check if already unsubscribed | ||
const sub = this.remoteSubscriptions.getConsumerSubscription(itemName, parameters, consumer) | ||
if (!sub) return | ||
// mark as completed - will resubscribe on reconnects | ||
sub.completed = true | ||
this.remoteSubscriptions.pause(itemName, parameters) | ||
@@ -141,0 +150,0 @@ |
@@ -7,2 +7,3 @@ import {assert} from "chai" | ||
import {CLIENT_ID_HEADER} from "../src/rpc.js" | ||
import WebSocket from "ws" | ||
@@ -74,4 +75,3 @@ describe("Subscriptions", () => { | ||
try { | ||
await client.item.subscribe(() => { | ||
}) | ||
await client.item.subscribe(() => {}) | ||
assert.fail("Error expected") | ||
@@ -95,4 +95,3 @@ } catch (e: any) { | ||
remote.item | ||
.subscribe(() => { | ||
}) | ||
.subscribe(() => {}) | ||
.catch((e: any) => { | ||
@@ -286,4 +285,3 @@ // ignored | ||
await remote.testUnsub.item.subscribe(() => { | ||
}) | ||
await remote.testUnsub.item.subscribe(() => {}) | ||
@@ -347,6 +345,4 @@ assert.equal(1, testServer?._allSubscriptions().length) | ||
const sub1 = () => { | ||
} | ||
const sub2 = () => { | ||
} | ||
const sub1 = () => {} | ||
const sub2 = () => {} | ||
@@ -383,4 +379,3 @@ await remote.item.subscribe(sub1) | ||
const sub = () => { | ||
} | ||
const sub = () => {} | ||
@@ -411,6 +406,4 @@ await remote.item.subscribe(sub) | ||
const sub1 = () => { | ||
} | ||
const sub2 = () => { | ||
} | ||
const sub1 = () => {} | ||
const sub2 = () => {} | ||
@@ -449,4 +442,3 @@ await remote.item.subscribe(sub1) | ||
const sub2 = () => { | ||
} | ||
const sub2 = () => {} | ||
@@ -480,4 +472,3 @@ await client.test.item.subscribe(sub1) | ||
const sub = () => { | ||
} | ||
const sub = () => {} | ||
client.item.subscribe(sub) | ||
@@ -495,2 +486,45 @@ | ||
it("unsubscribe while disconnected bug", async () => { | ||
const services = await startTestServer({ | ||
item: async () => { | ||
return 1 | ||
}, | ||
}) | ||
// delay client connection open by 10ms | ||
let oldAddEL: typeof WebSocket.prototype.addEventListener | ||
oldAddEL = WebSocket.prototype.addEventListener | ||
WebSocket.prototype.addEventListener = function (eventName: any, callback: any) { | ||
if (eventName == "open") { | ||
oldAddEL.apply(this, [ | ||
eventName, | ||
() => { | ||
setTimeout(callback, 10) | ||
}, | ||
]) | ||
return | ||
} | ||
return oldAddEL.apply(this, [eventName, callback]) | ||
} | ||
const client = await createTestClient<typeof services>() | ||
const sub = () => {} | ||
client.item.subscribe(sub) | ||
await adelay(10) | ||
client.item.unsubscribe(sub) | ||
await adelay(40) | ||
assert.equal(testClient!._allSubscriptions().length, 0) | ||
assert.equal(testServer!._allSubscriptions().length, 0) | ||
WebSocket.prototype.addEventListener = oldAddEL | ||
}) | ||
it("skip unchanged data", async () => { | ||
@@ -542,4 +576,3 @@ const item = {r: "1"} | ||
try { | ||
await client.test.longOp.subscribe(() => { | ||
}, new CallOptions({timeout: callTimeout})) | ||
await client.test.longOp.subscribe(() => {}, new CallOptions({timeout: callTimeout})) | ||
assert.fail() | ||
@@ -546,0 +579,0 @@ } catch (e: any) { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
232932
5111