@push-rpc/next
Advanced tools
Comparing version 2.0.6 to 2.0.7
@@ -25,5 +25,6 @@ "use strict"; | ||
async httpRequest(method, itemName, params, callTimeout, headers) { | ||
const itemUrl = this.getItemUrl(itemName); | ||
try { | ||
const { signal, finished } = timeoutSignal(callTimeout); | ||
const response = await fetch(this.getItemUrl(itemName), { | ||
const response = await fetch(itemUrl, { | ||
method, | ||
@@ -58,2 +59,5 @@ headers: { | ||
catch (e) { | ||
if (e.message == "Error" || !e.message) { | ||
e.message = `Error ${e.code} while ${itemUrl}`; | ||
} | ||
if (e.message == "fetch failed" && e.cause) { | ||
@@ -60,0 +64,0 @@ e = e.cause; |
export declare class RemoteSubscriptions { | ||
unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean; | ||
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): void; | ||
pause(itemName: string, parameters: unknown[]): void; | ||
unpause(itemName: string, parameters: unknown[]): void; | ||
flushQueue(itemName: string, parameters: unknown[]): void; | ||
emptyQueue(itemName: string, parameters: unknown[]): void; | ||
private removeSubscription; | ||
@@ -12,3 +16,4 @@ getCached(itemName: string, parameters: unknown[]): unknown | undefined; | ||
]>; | ||
private getFilterSubscriptions; | ||
private byItem; | ||
} |
@@ -21,2 +21,3 @@ "use strict"; | ||
consumers: [], | ||
queue: [], | ||
}; | ||
@@ -26,2 +27,32 @@ itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions); | ||
} | ||
pause(itemName, parameters) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters); | ||
if (!filterSubscriptions) | ||
return; | ||
filterSubscriptions.paused = true; | ||
} | ||
unpause(itemName, parameters) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters); | ||
if (!filterSubscriptions) | ||
return; | ||
filterSubscriptions.paused = false; | ||
} | ||
flushQueue(itemName, parameters) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters); | ||
if (!filterSubscriptions) | ||
return; | ||
filterSubscriptions.queue.forEach((data) => { | ||
filterSubscriptions.cached = data; | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data); | ||
}); | ||
}); | ||
filterSubscriptions.queue = []; | ||
} | ||
emptyQueue(itemName, parameters) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters); | ||
if (!filterSubscriptions) | ||
return; | ||
filterSubscriptions.queue = []; | ||
} | ||
removeSubscription(itemName, parametersKey, consumer) { | ||
@@ -48,21 +79,20 @@ const itemSubscriptions = this.byItem.get(itemName); | ||
getCached(itemName, parameters) { | ||
const parametersKey = getParametersKey(parameters); | ||
const itemSubscriptions = this.byItem.get(itemName); | ||
if (!itemSubscriptions) | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters); | ||
if (!filterSubscriptions) | ||
return; | ||
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey); | ||
return filterSubscriptions?.cached; | ||
return filterSubscriptions.cached; | ||
} | ||
consume(itemName, parameters, data) { | ||
const parametersKey = getParametersKey(parameters); | ||
const itemSubscriptions = this.byItem.get(itemName); | ||
if (!itemSubscriptions) | ||
return; | ||
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey); | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters); | ||
if (!filterSubscriptions) | ||
return; | ||
filterSubscriptions.cached = data; | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data); | ||
}); | ||
if (filterSubscriptions.paused) { | ||
filterSubscriptions.queue.push(data); | ||
} | ||
else { | ||
filterSubscriptions.cached = data; | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data); | ||
}); | ||
} | ||
} | ||
@@ -78,2 +108,12 @@ getAllSubscriptions() { | ||
} | ||
getFilterSubscriptions(itemName, parameters) { | ||
const parametersKey = getParametersKey(parameters); | ||
const itemSubscriptions = this.byItem.get(itemName); | ||
if (!itemSubscriptions) | ||
return; | ||
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey); | ||
if (!filterSubscriptions) | ||
return; | ||
return filterSubscriptions; | ||
} | ||
} | ||
@@ -80,0 +120,0 @@ exports.RemoteSubscriptions = RemoteSubscriptions; |
@@ -28,6 +28,11 @@ "use strict"; | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer); | ||
this.remoteSubscriptions.pause(itemName, parameters); | ||
const data = await this.invoke(itemName, rpc_js_1.InvocationType.Subscribe, (...parameters) => this.httpClient.subscribe(itemName, parameters, callOptions?.timeout ?? this.options.callTimeout), parameters); | ||
this.remoteSubscriptions.unpause(itemName, parameters); | ||
this.remoteSubscriptions.consume(itemName, parameters, data); | ||
this.remoteSubscriptions.flushQueue(itemName, parameters); | ||
} | ||
catch (e) { | ||
this.remoteSubscriptions.unpause(itemName, parameters); | ||
this.remoteSubscriptions.emptyQueue(itemName, parameters); | ||
await this.unsubscribe(itemName, parameters, consumer); | ||
@@ -34,0 +39,0 @@ throw e; |
{ | ||
"name": "@push-rpc/next", | ||
"version": "2.0.6", | ||
"version": "2.0.7", | ||
"main": "dist/index.js", | ||
@@ -5,0 +5,0 @@ "types": "dist/index.d.ts", |
@@ -32,2 +32,3 @@ ## Glossary | ||
- Browser sockets don't have 'ping' event. Need to find a different way to detect connection loss. | ||
- Перевірити, що throttling працює відразу для всіх підписників | ||
@@ -41,3 +42,3 @@ ## Features | ||
- Supports compressed HTTP requests. | ||
- Server runs on Node.JS, client runs in the Node.JS/Browser/ReactNative. Bun/Deno should also work, but not officially | ||
supported. | ||
- Server runs on Node.JS, client runs in the Node.JS/Browser/ReactNative. For RN some extra setup is required ( | ||
document). Bun/Deno should also work, but not officially supported. |
@@ -34,6 +34,8 @@ import {CLIENT_ID_HEADER, RpcErrors} from "../rpc.js" | ||
): Promise<unknown> { | ||
const itemUrl = this.getItemUrl(itemName) | ||
try { | ||
const {signal, finished} = timeoutSignal(callTimeout) | ||
const response = await fetch(this.getItemUrl(itemName), { | ||
const response = await fetch(itemUrl, { | ||
method, | ||
@@ -76,2 +78,6 @@ headers: { | ||
} catch (e: any) { | ||
if (e.message == "Error" || !e.message) { | ||
e.message = `Error ${e.code} while ${itemUrl}` | ||
} | ||
if (e.message == "fetch failed" && e.cause) { | ||
@@ -78,0 +84,0 @@ e = e.cause |
@@ -15,7 +15,11 @@ import {safeStringify} from "../utils/json.js" | ||
const parametersKey = getParametersKey(parameters) | ||
const parameterSubscriptions = itemSubscriptions.byParameters.get(parametersKey) || { | ||
const parameterSubscriptions: ParametersSubscription = itemSubscriptions.byParameters.get( | ||
parametersKey | ||
) || { | ||
parameters, | ||
cached: null, | ||
consumers: [], | ||
queue: [], | ||
} | ||
itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions) | ||
@@ -25,2 +29,37 @@ parameterSubscriptions.consumers.push(consumer) | ||
pause(itemName: string, parameters: unknown[]) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters) | ||
if (!filterSubscriptions) return | ||
filterSubscriptions.paused = true | ||
} | ||
unpause(itemName: string, parameters: unknown[]) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters) | ||
if (!filterSubscriptions) return | ||
filterSubscriptions.paused = false | ||
} | ||
flushQueue(itemName: string, parameters: unknown[]) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters) | ||
if (!filterSubscriptions) return | ||
filterSubscriptions.queue.forEach((data) => { | ||
filterSubscriptions.cached = data | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data) | ||
}) | ||
}) | ||
filterSubscriptions.queue = [] | ||
} | ||
emptyQueue(itemName: string, parameters: unknown[]) { | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters) | ||
if (!filterSubscriptions) return | ||
filterSubscriptions.queue = [] | ||
} | ||
private removeSubscription( | ||
@@ -56,26 +95,20 @@ itemName: string, | ||
getCached(itemName: string, parameters: unknown[]): unknown | undefined { | ||
const parametersKey = getParametersKey(parameters) | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters) | ||
if (!filterSubscriptions) return | ||
const itemSubscriptions = this.byItem.get(itemName) | ||
if (!itemSubscriptions) return | ||
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey) | ||
return filterSubscriptions?.cached | ||
return filterSubscriptions.cached | ||
} | ||
consume(itemName: string, parameters: unknown[], data: unknown) { | ||
const parametersKey = getParametersKey(parameters) | ||
const itemSubscriptions = this.byItem.get(itemName) | ||
if (!itemSubscriptions) return | ||
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey) | ||
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters) | ||
if (!filterSubscriptions) return | ||
filterSubscriptions.cached = data | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data) | ||
}) | ||
if (filterSubscriptions.paused) { | ||
filterSubscriptions.queue.push(data) | ||
} else { | ||
filterSubscriptions.cached = data | ||
filterSubscriptions.consumers.forEach((consumer) => { | ||
consumer(data) | ||
}) | ||
} | ||
} | ||
@@ -97,2 +130,17 @@ | ||
private getFilterSubscriptions( | ||
itemName: string, | ||
parameters: unknown[] | ||
): ParametersSubscription | undefined { | ||
const parametersKey = getParametersKey(parameters) | ||
const itemSubscriptions = this.byItem.get(itemName) | ||
if (!itemSubscriptions) return | ||
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey) | ||
if (!filterSubscriptions) return | ||
return filterSubscriptions | ||
} | ||
private byItem: Map<string, ItemSubscription> = new Map() | ||
@@ -102,14 +150,16 @@ } | ||
type ItemSubscription = { | ||
byParameters: Map< | ||
string, | ||
{ | ||
parameters: unknown[] | ||
cached: unknown | ||
consumers: Array<(d: unknown) => void> | ||
} | ||
> | ||
byParameters: Map<string, ParametersSubscription> | ||
} | ||
type ParametersSubscription = { | ||
parameters: unknown[] | ||
cached: unknown | ||
consumers: Array<(d: unknown) => void> | ||
paused: boolean | ||
queue: unknown[] | ||
} | ||
function getParametersKey(parameters: unknown[]) { | ||
return safeStringify(parameters) | ||
} |
@@ -122,2 +122,3 @@ import {CallOptions, InvocationType, RpcContext, Services} from "../rpc.js" | ||
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer) | ||
this.remoteSubscriptions.pause(itemName, parameters) | ||
@@ -136,4 +137,8 @@ const data = await this.invoke( | ||
this.remoteSubscriptions.unpause(itemName, parameters) | ||
this.remoteSubscriptions.consume(itemName, parameters, data) | ||
this.remoteSubscriptions.flushQueue(itemName, parameters) | ||
} catch (e) { | ||
this.remoteSubscriptions.unpause(itemName, parameters) | ||
this.remoteSubscriptions.emptyQueue(itemName, parameters) | ||
await this.unsubscribe(itemName, parameters, consumer) | ||
@@ -140,0 +145,0 @@ throw e |
@@ -475,2 +475,123 @@ import {assert} from "chai" | ||
}).timeout(5000) | ||
it("missing update in case of concurrent subscribe/trigger", async () => { | ||
const delay = 50 | ||
const services = await startTestServer({ | ||
test: { | ||
async longOp() { | ||
await adelay(delay) | ||
return 1 | ||
}, | ||
}, | ||
}) | ||
const client = await createTestClient<typeof services>({ | ||
callTimeout: 2 * delay, | ||
}) | ||
let received = 0 | ||
client.test.longOp.subscribe((val) => { | ||
console.log("got ", val) | ||
received = val | ||
}) | ||
await adelay(20) | ||
services.test.longOp.trigger(undefined, 2) | ||
await adelay(2 * delay) | ||
assert.equal(received, 2) | ||
}) | ||
it.skip("two concurrent subscribes and trigger", async () => { | ||
const delay = 50 | ||
const services = await startTestServer({ | ||
test: { | ||
async longOp() { | ||
await adelay(delay) | ||
return 1 | ||
}, | ||
}, | ||
}) | ||
const client = await createTestClient<typeof services>({ | ||
callTimeout: 2 * delay, | ||
}) | ||
let receivedA = 0 | ||
let receivedB = 0 | ||
client.test.longOp.subscribe((val) => { | ||
console.log("gotA", val) | ||
receivedA = val | ||
}) | ||
client.test.longOp.subscribe((val) => { | ||
console.log("gotB", val) | ||
receivedB = val | ||
}) | ||
await adelay(20) | ||
services.test.longOp.trigger(undefined, 2) | ||
await adelay(2 * delay) | ||
assert.equal(receivedA, 2) | ||
assert.equal(receivedB, 2) | ||
}) | ||
it("clear queue on subscription failure", async () => { | ||
const delay = 50 | ||
let invocation = 0 | ||
const services = await startTestServer({ | ||
test: { | ||
async longOp(): Promise<number> { | ||
await adelay(delay) | ||
if (invocation++ == 1) { | ||
throw new Error("AA") | ||
} | ||
return 1 | ||
}, | ||
}, | ||
}) | ||
const client = await createTestClient<typeof services>({ | ||
callTimeout: 2 * delay, | ||
}) | ||
let received = 0 | ||
client.test.longOp.subscribe((val) => { | ||
received = val | ||
}) | ||
await adelay(1.5 * delay) | ||
client.test.longOp | ||
.subscribe((val) => { | ||
received = val | ||
}) | ||
.catch((e) => { | ||
// ok | ||
}) | ||
await adelay(20) | ||
services.test.longOp.trigger(undefined, 2) // this should be skipped | ||
await adelay(1.5 * delay - 20) | ||
client.test.longOp.subscribe((val) => { | ||
received = val | ||
}) | ||
await adelay(1.5 * delay) | ||
assert.equal(received, 1) | ||
}) | ||
}) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
207317
4603
43