Socket
Socket
Sign inDemoInstall

@push-rpc/core

Package Overview
Dependencies
Maintainers
1
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push-rpc/core - npm Package Compare versions

Comparing version 1.8.0 to 1.8.1

4

dist/local.js

@@ -76,3 +76,5 @@ "use strict";

const index = subscription.sessions.indexOf(session);
subscription.sessions.splice(index, 1);
if (index >= 0) {
subscription.sessions.splice(index, 1);
}
if (!subscription.sessions.length) {

@@ -79,0 +81,0 @@ delete this.subscriptions[key];

@@ -0,1 +1,2 @@

import { LocalTopicImpl } from "./local";
import { CallOptions, MessageType, Middleware, RpcConnectionContext, RpcContext } from "./rpc";

@@ -24,4 +25,4 @@ import { Socket } from "./transport";

subscriptions: {
topic: any;
params: any;
topic: LocalTopicImpl<unknown, unknown>;
params: unknown;
}[];

@@ -28,0 +29,0 @@ lastMessageAt: number;

@@ -325,7 +325,8 @@ "use strict";

const ctx = this.createContext(messageId, topic.getTopicName());
// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list
this.subscriptions.push({ topic, params });
this.listeners.subscribed(this.subscriptions.length);
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx);
const r = await this.localMiddleware(ctx, subscribeTopic, params, rpc_1.MessageType.Subscribe);
this.send(rpc_1.MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r);
this.subscriptions.push({ topic, params });
this.listeners.subscribed(this.subscriptions.length);
}

@@ -332,0 +333,0 @@ catch (e) {

{
"name": "@push-rpc/core",
"version": "1.8.0",
"version": "1.8.1",
"main": "dist/index.js",

@@ -21,3 +21,3 @@ "types": "dist/index.d.ts",

},
"gitHead": "8a5ba0d8a336a598746903ab25c1601e5033a765"
"gitHead": "f0936c94754ea6d3f4157be2ea40b4ce6c8ddfbb"
}

@@ -105,3 +105,5 @@ import {DataConsumer, DataSupplier, MessageType, Topic, TopicImpl} from "./rpc"

const index = subscription.sessions.indexOf(session)
subscription.sessions.splice(index, 1)
if (index >= 0) {
subscription.sessions.splice(index, 1)
}

@@ -108,0 +110,0 @@ if (!subscription.sessions.length) {

@@ -42,3 +42,3 @@ import {LocalTopicImpl} from "./local"

public remote: any
public subscriptions: {topic; params}[] = []
public subscriptions: {topic: LocalTopicImpl<unknown, unknown>; params: unknown}[] = []
public lastMessageAt: number

@@ -425,2 +425,6 @@

// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list
this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx)

@@ -430,5 +434,2 @@ const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe)

this.send(MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r)
this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)
} catch (e) {

@@ -435,0 +436,0 @@ log.error(`Unable to subscribe to topic ${topic.getTopicName()}`, e)

@@ -5,2 +5,4 @@ import {assert} from "chai"

import {createNodeWebsocket} from "../../websocket/src"
import {wrapWebsocket} from "../../websocket/src/server"
import WebSocket from "ws"

@@ -117,2 +119,43 @@ describe("Topic bugs", () => {

})
it("exception in supplier leaves session referenced on unsubscribe", async () => {
const services = {
item: new LocalTopicImpl(async () => {
throw new Error()
}),
}
await startTestServer(services)
let ws
const client = await createRpcClient(async () => {
ws = new WebSocket(`ws://localhost:${TEST_PORT}`)
return wrapWebsocket(ws)
})
client.remote.item
.subscribe(() => {}, {})
.catch(e => {
// ignored
})
// pause the socket so that the server doesn't get the unsubscribe message
ws.send = () => {}
await new Promise(r => setTimeout(r, 20))
assert.equal(1, Object.keys(services.item["subscriptions"]).length)
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length)
const [session] = Object.values(services.item["subscriptions"])[0].sessions
assert.equal(1, session.subscriptions.length)
await client.disconnect()
// time to cleanup
await new Promise(r => setTimeout(r, 100))
assert.equal(0, Object.keys(services.item["subscriptions"]).length)
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc