Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@push-rpc/next

Package Overview
Dependencies
Maintainers
0
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push-rpc/next - npm Package Compare versions

Comparing version 2.0.17 to 2.0.18

2

.prettierrc.json
{
"printWidth": 100,
"semi": false,
"trailingComma": "es5",
"trailingComma": "all",
"bracketSpacing": false,
"htmlWhitespaceSensitivity": "ignore"
}

@@ -6,2 +6,3 @@ import { ClientCache } from "./index";

unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean;
/** Add subscription in pending state */
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): void;

@@ -15,2 +16,3 @@ pause(itemName: string, parameters: unknown[]): void;

consume(itemName: string, parameters: unknown[], data: unknown): void;
getConsumerSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): ConsumerSubscription | undefined;
getAllSubscriptions(): Array<[

@@ -24,1 +26,6 @@ itemName: string,

}
type ConsumerSubscription = {
consumer: (d: unknown) => void;
completed: boolean;
};
export {};

@@ -14,2 +14,3 @@ "use strict";

}
/** Add subscription in pending state */
addSubscription(itemName, parameters, consumer) {

@@ -26,3 +27,6 @@ const itemSubscriptions = this.byItem.get(itemName) || { byParameters: new Map() };

itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions);
parameterSubscriptions.consumers.push(consumer);
parameterSubscriptions.consumers.push({
consumer,
completed: false,
});
}

@@ -50,3 +54,5 @@ pause(itemName, parameters) {

filterSubscriptions.consumers.forEach((consumer) => {
consumer(data);
if (consumer.completed) {
consumer.consumer(data);
}
});

@@ -69,3 +75,3 @@ });

return false;
const index = filterSubscriptions.consumers.indexOf(consumer);
const index = filterSubscriptions.consumers.findIndex((c) => c.consumer == consumer);
if (index == -1)

@@ -101,6 +107,12 @@ return false;

filterSubscriptions.consumers.forEach((consumer) => {
consumer(data);
if (consumer.completed) {
consumer.consumer(data);
}
});
}
}
getConsumerSubscription(itemName, parameters, consumer) {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters);
return (filterSubscriptions?.consumers || []).find((c) => c.consumer == consumer);
}
getAllSubscriptions() {

@@ -110,3 +122,8 @@ const result = [];

for (const [, parameterSubscriptions] of itemSubscriptions.byParameters) {
result.push([itemName, parameterSubscriptions.parameters, parameterSubscriptions.consumers]);
const consumers = parameterSubscriptions.consumers
.filter((c) => c.completed)
.map((c) => c.consumer);
if (consumers.length) {
result.push([itemName, parameterSubscriptions.parameters, consumers]);
}
}

@@ -113,0 +130,0 @@ }

@@ -23,2 +23,4 @@ "use strict";

}
// add subscription in pending state to test later if it was unsubscribed during connection wait
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer);
// Needs to be awaited b/c resubscribe will make a 2nd request then.

@@ -28,3 +30,8 @@ // Also, server needs the connection to be established before making a subscription

try {
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer);
// check if already unsubscribed
const sub = this.remoteSubscriptions.getConsumerSubscription(itemName, parameters, consumer);
if (!sub)
return;
// mark as completed - will resubscribe on reconnects
sub.completed = true;
this.remoteSubscriptions.pause(itemName, parameters);

@@ -31,0 +38,0 @@ const data = await this.invoke(itemName, rpc_js_1.InvocationType.Subscribe, (...parameters) => this.httpClient.subscribe(itemName, parameters, callOptions?.timeout ?? this.options.callTimeout), parameters);

{
"name": "@push-rpc/next",
"version": "2.0.17",
"version": "2.0.18",
"main": "dist/index.js",

@@ -5,0 +5,0 @@ "types": "dist/index.d.ts",

@@ -6,4 +6,3 @@ import {safeStringify} from "../utils/json.js"

export class RemoteSubscriptions {
constructor(private cache: ClientCache | null) {
}
constructor(private cache: ClientCache | null) {}

@@ -16,2 +15,3 @@ unsubscribe(itemName: string, parameters: unknown[], consumer: (d: unknown) => void): boolean {

/** Add subscription in pending state */
addSubscription(itemName: string, parameters: unknown[], consumer: (d: unknown) => void) {

@@ -32,3 +32,6 @@ const itemSubscriptions = this.byItem.get(itemName) || {byParameters: new Map()}

itemSubscriptions.byParameters.set(parametersKey, parameterSubscriptions)
parameterSubscriptions.consumers.push(consumer)
parameterSubscriptions.consumers.push({
consumer,
completed: false,
})
}

@@ -58,3 +61,5 @@

filterSubscriptions.consumers.forEach((consumer) => {
consumer(data)
if (consumer.completed) {
consumer.consumer(data)
}
})

@@ -84,3 +89,3 @@ })

const index = filterSubscriptions.consumers.indexOf(consumer)
const index = filterSubscriptions.consumers.findIndex((c) => c.consumer == consumer)
if (index == -1) return false

@@ -120,3 +125,5 @@

filterSubscriptions.consumers.forEach((consumer) => {
consumer(data)
if (consumer.completed) {
consumer.consumer(data)
}
})

@@ -126,2 +133,11 @@ }

getConsumerSubscription(
itemName: string,
parameters: unknown[],
consumer: (d: unknown) => void,
): ConsumerSubscription | undefined {
const filterSubscriptions = this.getFilterSubscriptions(itemName, parameters)
return (filterSubscriptions?.consumers || []).find((c) => c.consumer == consumer)
}
getAllSubscriptions(): Array<

@@ -134,3 +150,9 @@ [itemName: string, parameters: unknown[], consumers: Array<(d: unknown) => void>]

for (const [, parameterSubscriptions] of itemSubscriptions.byParameters) {
result.push([itemName, parameterSubscriptions.parameters, parameterSubscriptions.consumers])
const consumers = parameterSubscriptions.consumers
.filter((c) => c.completed)
.map((c) => c.consumer)
if (consumers.length) {
result.push([itemName, parameterSubscriptions.parameters, consumers])
}
}

@@ -167,3 +189,3 @@ }

cached: unknown
consumers: Array<(d: unknown) => void>
consumers: ConsumerSubscription[]

@@ -174,4 +196,9 @@ paused: boolean

type ConsumerSubscription = {
consumer: (d: unknown) => void
completed: boolean
}
function getParametersKey(parameters: unknown[]) {
return safeStringify(parameters)
}

@@ -8,3 +8,3 @@ import {CallOptions, InvocationType, RpcContext, Services} from "../rpc.js"

import {ConsumeServicesOptions, RpcClient} from "./index.js"
import {Middleware, withMiddlewares} from "../utils/middleware.js"
import {withMiddlewares} from "../utils/middleware.js"

@@ -132,2 +132,5 @@ export class RpcClientImpl<S extends Services<S>> implements RpcClient {

// add subscription in pending state to test later if it was unsubscribed during connection wait
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer)
// Needs to be awaited b/c resubscribe will make a 2nd request then.

@@ -138,3 +141,9 @@ // Also, server needs the connection to be established before making a subscription

try {
this.remoteSubscriptions.addSubscription(itemName, parameters, consumer)
// check if already unsubscribed
const sub = this.remoteSubscriptions.getConsumerSubscription(itemName, parameters, consumer)
if (!sub) return
// mark as completed - will resubscribe on reconnects
sub.completed = true
this.remoteSubscriptions.pause(itemName, parameters)

@@ -141,0 +150,0 @@

@@ -7,2 +7,3 @@ import {assert} from "chai"

import {CLIENT_ID_HEADER} from "../src/rpc.js"
import WebSocket from "ws"

@@ -74,4 +75,3 @@ describe("Subscriptions", () => {

try {
await client.item.subscribe(() => {
})
await client.item.subscribe(() => {})
assert.fail("Error expected")

@@ -95,4 +95,3 @@ } catch (e: any) {

remote.item
.subscribe(() => {
})
.subscribe(() => {})
.catch((e: any) => {

@@ -286,4 +285,3 @@ // ignored

await remote.testUnsub.item.subscribe(() => {
})
await remote.testUnsub.item.subscribe(() => {})

@@ -347,6 +345,4 @@ assert.equal(1, testServer?._allSubscriptions().length)

const sub1 = () => {
}
const sub2 = () => {
}
const sub1 = () => {}
const sub2 = () => {}

@@ -383,4 +379,3 @@ await remote.item.subscribe(sub1)

const sub = () => {
}
const sub = () => {}

@@ -411,6 +406,4 @@ await remote.item.subscribe(sub)

const sub1 = () => {
}
const sub2 = () => {
}
const sub1 = () => {}
const sub2 = () => {}

@@ -449,4 +442,3 @@ await remote.item.subscribe(sub1)

const sub2 = () => {
}
const sub2 = () => {}

@@ -480,4 +472,3 @@ await client.test.item.subscribe(sub1)

const sub = () => {
}
const sub = () => {}
client.item.subscribe(sub)

@@ -495,2 +486,45 @@

it("unsubscribe while disconnected bug", async () => {
const services = await startTestServer({
item: async () => {
return 1
},
})
// delay client connection open by 10ms
let oldAddEL: typeof WebSocket.prototype.addEventListener
oldAddEL = WebSocket.prototype.addEventListener
WebSocket.prototype.addEventListener = function (eventName: any, callback: any) {
if (eventName == "open") {
oldAddEL.apply(this, [
eventName,
() => {
setTimeout(callback, 10)
},
])
return
}
return oldAddEL.apply(this, [eventName, callback])
}
const client = await createTestClient<typeof services>()
const sub = () => {}
client.item.subscribe(sub)
await adelay(10)
client.item.unsubscribe(sub)
await adelay(40)
assert.equal(testClient!._allSubscriptions().length, 0)
assert.equal(testServer!._allSubscriptions().length, 0)
WebSocket.prototype.addEventListener = oldAddEL
})
it("skip unchanged data", async () => {

@@ -542,4 +576,3 @@ const item = {r: "1"}

try {
await client.test.longOp.subscribe(() => {
}, new CallOptions({timeout: callTimeout}))
await client.test.longOp.subscribe(() => {}, new CallOptions({timeout: callTimeout}))
assert.fail()

@@ -546,0 +579,0 @@ } catch (e: any) {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc