New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@push-rpc/next

Package Overview
Dependencies
Maintainers
1
Versions
29
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push-rpc/next - npm Package Compare versions

Comparing version 2.0.6 to 2.0.7

6

dist/client/HttpClient.js

@@ -25,5 +25,6 @@ "use strict";

async httpRequest(method, itemName, params, callTimeout, headers) {
const itemUrl = this.getItemUrl(itemName);
try {
const { signal, finished } = timeoutSignal(callTimeout);
const response = await fetch(this.getItemUrl(itemName), {
const response = await fetch(itemUrl, {
method,

@@ -58,2 +59,5 @@ headers: {

catch (e) {
if (e.message == "Error" || !e.message) {
e.message = `Error ${e.code} while ${itemUrl}`;
}
if (e.message == "fetch failed" && e.cause) {

@@ -60,0 +64,0 @@ e = e.cause;

export declare class RemoteSubscriptions {
unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean;
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): void;
pause(itemName: string, parameters: unknown[]): void;
unpause(itemName: string, parameters: unknown[]): void;
flushQueue(itemName: string, parameters: unknown[]): void;
emptyQueue(itemName: string, parameters: unknown[]): void;
private removeSubscription;

@@ -12,3 +16,4 @@ getCached(itemName: string, parameters: unknown[]): unknown | undefined;

]>;
private getFilterSubscriptions;
private byItem;
}

68

dist/client/RemoteSubscriptions.js

@@ -21,2 +21,3 @@ "use strict";

consumers: [],
queue: [],
};

@@ -26,2 +27,32 @@ itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions);

}
pause(itemName, parameters) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters);
if (!filterSubscriptions)
return;
filterSubscriptions.paused = true;
}
unpause(itemName, parameters) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters);
if (!filterSubscriptions)
return;
filterSubscriptions.paused = false;
}
flushQueue(itemName, parameters) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters);
if (!filterSubscriptions)
return;
filterSubscriptions.queue.forEach((data) => {
filterSubscriptions.cached = data;
filterSubscriptions.consumers.forEach((consumer) => {
consumer(data);
});
});
filterSubscriptions.queue = [];
}
emptyQueue(itemName, parameters) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters);
if (!filterSubscriptions)
return;
filterSubscriptions.queue = [];
}
removeSubscription(itemName, parametersKey, consumer) {

@@ -48,21 +79,20 @@ const itemSubscriptions = this.byItem.get(itemName);

getCached(itemName, parameters) {
const parametersKey = getParametersKey(parameters);
const itemSubscriptions = this.byItem.get(itemName);
if (!itemSubscriptions)
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters);
if (!filterSubscriptions)
return;
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey);
return filterSubscriptions?.cached;
return filterSubscriptions.cached;
}
consume(itemName, parameters, data) {
const parametersKey = getParametersKey(parameters);
const itemSubscriptions = this.byItem.get(itemName);
if (!itemSubscriptions)
return;
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey);
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters);
if (!filterSubscriptions)
return;
filterSubscriptions.cached = data;
filterSubscriptions.consumers.forEach((consumer) => {
consumer(data);
});
if (filterSubscriptions.paused) {
filterSubscriptions.queue.push(data);
}
else {
filterSubscriptions.cached = data;
filterSubscriptions.consumers.forEach((consumer) => {
consumer(data);
});
}
}

@@ -78,2 +108,12 @@ getAllSubscriptions() {

}
getFilterSubscriptions(itemName, parameters) {
const parametersKey = getParametersKey(parameters);
const itemSubscriptions = this.byItem.get(itemName);
if (!itemSubscriptions)
return;
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey);
if (!filterSubscriptions)
return;
return filterSubscriptions;
}
}

@@ -80,0 +120,0 @@ exports.RemoteSubscriptions = RemoteSubscriptions;

@@ -28,6 +28,11 @@ "use strict";

this.remoteSubscriptions.addSubscription(itemName, parameters, consumer);
this.remoteSubscriptions.pause(itemName, parameters);
const data = await this.invoke(itemName, rpc_js_1.InvocationType.Subscribe, (...parameters) => this.httpClient.subscribe(itemName, parameters, callOptions?.timeout ?? this.options.callTimeout), parameters);
this.remoteSubscriptions.unpause(itemName, parameters);
this.remoteSubscriptions.consume(itemName, parameters, data);
this.remoteSubscriptions.flushQueue(itemName, parameters);
}
catch (e) {
this.remoteSubscriptions.unpause(itemName, parameters);
this.remoteSubscriptions.emptyQueue(itemName, parameters);
await this.unsubscribe(itemName, parameters, consumer);

@@ -34,0 +39,0 @@ throw e;

{
"name": "@push-rpc/next",
"version": "2.0.6",
"version": "2.0.7",
"main": "dist/index.js",

@@ -5,0 +5,0 @@ "types": "dist/index.d.ts",

@@ -32,2 +32,3 @@ ## Glossary

- Browser sockets don't have 'ping' event. Need to find a different way to detect connection loss.
- Перевірити, що throttling працює відразу для всіх підписників

@@ -41,3 +42,3 @@ ## Features

- Supports compressed HTTP requests.
- Server runs on Node.JS, client runs in the Node.JS/Browser/ReactNative. Bun/Deno should also work, but not officially
supported.
- Server runs on Node.JS, client runs in the Node.JS/Browser/ReactNative. For RN some extra setup is required (
document). Bun/Deno should also work, but not officially supported.

@@ -34,6 +34,8 @@ import {CLIENT_ID_HEADER, RpcErrors} from "../rpc.js"

): Promise<unknown> {
const itemUrl = this.getItemUrl(itemName)
try {
const {signal, finished} = timeoutSignal(callTimeout)
const response = await fetch(this.getItemUrl(itemName), {
const response = await fetch(itemUrl, {
method,

@@ -76,2 +78,6 @@ headers: {

} catch (e: any) {
if (e.message == "Error" || !e.message) {
e.message = `Error ${e.code} while ${itemUrl}`
}
if (e.message == "fetch failed" && e.cause) {

@@ -78,0 +84,0 @@ e = e.cause

@@ -15,7 +15,11 @@ import {safeStringify} from "../utils/json.js"

const parametersKey = getParametersKey(parameters)
const parameterSubscriptions = itemSubscriptions.byParameters.get(parametersKey) || {
const parameterSubscriptions: ParametersSubscription = itemSubscriptions.byParameters.get(
parametersKey
) || {
parameters,
cached: null,
consumers: [],
queue: [],
}
itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions)

@@ -25,2 +29,37 @@ parameterSubscriptions.consumers.push(consumer)

pause(itemName: string, parameters: unknown[]) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters)
if (!filterSubscriptions) return
filterSubscriptions.paused = true
}
unpause(itemName: string, parameters: unknown[]) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters)
if (!filterSubscriptions) return
filterSubscriptions.paused = false
}
flushQueue(itemName: string, parameters: unknown[]) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters)
if (!filterSubscriptions) return
filterSubscriptions.queue.forEach((data) => {
filterSubscriptions.cached = data
filterSubscriptions.consumers.forEach((consumer) => {
consumer(data)
})
})
filterSubscriptions.queue = []
}
emptyQueue(itemName: string, parameters: unknown[]) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters)
if (!filterSubscriptions) return
filterSubscriptions.queue = []
}
private removeSubscription(

@@ -56,26 +95,20 @@ itemName: string,

getCached(itemName: string, parameters: unknown[]): unknown | undefined {
const parametersKey = getParametersKey(parameters)
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters)
if (!filterSubscriptions) return
const itemSubscriptions = this.byItem.get(itemName)
if (!itemSubscriptions) return
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey)
return filterSubscriptions?.cached
return filterSubscriptions.cached
}
consume(itemName: string, parameters: unknown[], data: unknown) {
const parametersKey = getParametersKey(parameters)
const itemSubscriptions = this.byItem.get(itemName)
if (!itemSubscriptions) return
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey)
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters)
if (!filterSubscriptions) return
filterSubscriptions.cached = data
filterSubscriptions.consumers.forEach((consumer) => {
consumer(data)
})
if (filterSubscriptions.paused) {
filterSubscriptions.queue.push(data)
} else {
filterSubscriptions.cached = data
filterSubscriptions.consumers.forEach((consumer) => {
consumer(data)
})
}
}

@@ -97,2 +130,17 @@

private getFilterSubscriptions(
itemName: string,
parameters: unknown[]
): ParametersSubscription | undefined {
const parametersKey = getParametersKey(parameters)
const itemSubscriptions = this.byItem.get(itemName)
if (!itemSubscriptions) return
const filterSubscriptions = itemSubscriptions.byParameters.get(parametersKey)
if (!filterSubscriptions) return
return filterSubscriptions
}
private byItem: Map<string, ItemSubscription> = new Map()

@@ -102,14 +150,16 @@ }

type ItemSubscription = {
byParameters: Map<
string,
{
parameters: unknown[]
cached: unknown
consumers: Array<(d: unknown) => void>
}
>
byParameters: Map<string, ParametersSubscription>
}
type ParametersSubscription = {
parameters: unknown[]
cached: unknown
consumers: Array<(d: unknown) => void>
paused: boolean
queue: unknown[]
}
function getParametersKey(parameters: unknown[]) {
return safeStringify(parameters)
}

@@ -122,2 +122,3 @@ import {CallOptions, InvocationType, RpcContext, Services} from "../rpc.js"

this.remoteSubscriptions.addSubscription(itemName, parameters, consumer)
this.remoteSubscriptions.pause(itemName, parameters)

@@ -136,4 +137,8 @@ const data = await this.invoke(

this.remoteSubscriptions.unpause(itemName, parameters)
this.remoteSubscriptions.consume(itemName, parameters, data)
this.remoteSubscriptions.flushQueue(itemName, parameters)
} catch (e) {
this.remoteSubscriptions.unpause(itemName, parameters)
this.remoteSubscriptions.emptyQueue(itemName, parameters)
await this.unsubscribe(itemName, parameters, consumer)

@@ -140,0 +145,0 @@ throw e

@@ -475,2 +475,123 @@ import {assert} from "chai"

}).timeout(5000)
it("missing update in case of concurrent subscribe/trigger", async () => {
const delay = 50
const services = await startTestServer({
test: {
async longOp() {
await adelay(delay)
return 1
},
},
})
const client = await createTestClient<typeof services>({
callTimeout: 2 * delay,
})
let received = 0
client.test.longOp.subscribe((val) => {
console.log("got ", val)
received = val
})
await adelay(20)
services.test.longOp.trigger(undefined, 2)
await adelay(2 * delay)
assert.equal(received, 2)
})
it.skip("two concurrent subscribes and trigger", async () => {
const delay = 50
const services = await startTestServer({
test: {
async longOp() {
await adelay(delay)
return 1
},
},
})
const client = await createTestClient<typeof services>({
callTimeout: 2 * delay,
})
let receivedA = 0
let receivedB = 0
client.test.longOp.subscribe((val) => {
console.log("gotA", val)
receivedA = val
})
client.test.longOp.subscribe((val) => {
console.log("gotB", val)
receivedB = val
})
await adelay(20)
services.test.longOp.trigger(undefined, 2)
await adelay(2 * delay)
assert.equal(receivedA, 2)
assert.equal(receivedB, 2)
})
it("clear queue on subscription failure", async () => {
const delay = 50
let invocation = 0
const services = await startTestServer({
test: {
async longOp(): Promise<number> {
await adelay(delay)
if (invocation++ == 1) {
throw new Error("AA")
}
return 1
},
},
})
const client = await createTestClient<typeof services>({
callTimeout: 2 * delay,
})
let received = 0
client.test.longOp.subscribe((val) => {
received = val
})
await adelay(1.5 * delay)
client.test.longOp
.subscribe((val) => {
received = val
})
.catch((e) => {
// ok
})
await adelay(20)
services.test.longOp.trigger(undefined, 2) // this should be skipped
await adelay(1.5 * delay - 20)
client.test.longOp.subscribe((val) => {
received = val
})
await adelay(1.5 * delay)
assert.equal(received, 1)
})
})

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc