Socket
Socket
Sign inDemoInstall

@push-rpc/core

Package Overview
Dependencies
Maintainers
1
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push-rpc/core - npm Package Compare versions

Comparing version 1.8.1 to 1.8.2

8

dist/local.js

@@ -68,3 +68,9 @@ "use strict";

this.subscriptions[key] = subscription;
return await this.getData(filter, ctx, session.getConnectionContext());
try {
return await this.getData(filter, ctx, session.getConnectionContext());
}
catch (e) {
this.unsubscribeSession(session, filter);
throw e;
}
}

@@ -71,0 +77,0 @@ unsubscribeSession(session, filter) {

5

dist/RpcSession.js

@@ -325,8 +325,7 @@ "use strict";

const ctx = this.createContext(messageId, topic.getTopicName());
// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list
this.subscriptions.push({ topic, params });
this.listeners.subscribed(this.subscriptions.length);
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx);
const r = await this.localMiddleware(ctx, subscribeTopic, params, rpc_1.MessageType.Subscribe);
this.send(rpc_1.MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r);
this.subscriptions.push({ topic, params });
this.listeners.subscribed(this.subscriptions.length);
}

@@ -333,0 +332,0 @@ catch (e) {

{
"name": "@push-rpc/core",
"version": "1.8.1",
"version": "1.8.2",
"main": "dist/index.js",

@@ -21,3 +21,3 @@ "types": "dist/index.d.ts",

},
"gitHead": "f0936c94754ea6d3f4157be2ea40b4ce6c8ddfbb"
"gitHead": "a8a80fd6b1683a00273d3889fbbdb14387d7edbc"
}

@@ -95,3 +95,8 @@ import {DataConsumer, DataSupplier, MessageType, Topic, TopicImpl} from "./rpc"

return await this.getData(filter, ctx, session.getConnectionContext())
try {
return await this.getData(filter, ctx, session.getConnectionContext())
} catch (e) {
this.unsubscribeSession(session, filter)
throw e
}
}

@@ -98,0 +103,0 @@

@@ -424,6 +424,2 @@ import {LocalTopicImpl} from "./local"

// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list
this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx)

@@ -433,2 +429,5 @@ const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe)

this.send(MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r)
this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)
} catch (e) {

@@ -435,0 +434,0 @@ log.error(`Unable to subscribe to topic ${topic.getTopicName()}`, e)

@@ -118,43 +118,2 @@ import {assert} from "chai"

})
it("exception in supplier leaves session referenced on unsubscribe", async () => {
const services = {
item: new LocalTopicImpl(async () => {
throw new Error()
}),
}
await startTestServer(services)
let ws
const client = await createRpcClient(async () => {
ws = new WebSocket(`ws://localhost:${TEST_PORT}`)
return wrapWebsocket(ws)
})
client.remote.item
.subscribe(() => {}, {})
.catch(e => {
// ignored
})
// pause the socket so that the server doesn't get the unsubscribe message
ws.send = () => {}
await new Promise(r => setTimeout(r, 20))
assert.equal(1, Object.keys(services.item["subscriptions"]).length)
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length)
const [session] = Object.values(services.item["subscriptions"])[0].sessions
assert.equal(1, session.subscriptions.length)
await client.disconnect()
// time to cleanup
await new Promise(r => setTimeout(r, 100))
assert.equal(0, Object.keys(services.item["subscriptions"]).length)
})
})

@@ -6,3 +6,5 @@ import {assert} from "chai"

import {createTestClient, startTestServer, TEST_PORT} from "./testUtils"
import {createNodeWebsocket} from "../../websocket/src/server"
import {createNodeWebsocket, wrapWebsocket} from "../../websocket/src/server"
import {RpcSession} from "../src/RpcSession"
import WebSocket from "ws"

@@ -487,2 +489,36 @@ describe("Topics", () => {

})
it("exception during subscribe do not create subscription", async () => {
// throwing exception from supplier is an indication that topic do not want to subscribe this consumer
const services = {
item: new LocalTopicImpl(async () => {
throw new Error()
}),
}
const server = await startTestServer(services)
let ws
const client = await createRpcClient(async () => {
ws = new WebSocket(`ws://localhost:${TEST_PORT}`)
return wrapWebsocket(ws)
})
client.remote.item
.subscribe(() => {}, {})
.catch(e => {
// ignored
})
// pause the socket so that the server doesn't get the unsubscribe message
ws.send = () => {}
await new Promise(r => setTimeout(r, 20))
assert.equal(0, Object.keys(services.item["subscriptions"]).length)
const session = Object.values(server["__sessions"])[0] as RpcSession
assert.equal(0, session.subscriptions.length)
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc