Socket
Socket
Sign inDemoInstall

@push-rpc/core

Package Overview
Dependencies
Maintainers
1
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push-rpc/core - npm Package Compare versions

Comparing version 1.8.2 to 1.8.3

35

dist/remote.js

@@ -24,9 +24,24 @@ "use strict";

}
const alreadySubscribed = !!this.consumers[paramsKey];
this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), { consumer, subscriptionKey }];
try {
await this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Subscribe, callOpts);
if (alreadySubscribed) {
// refetch latest data
try {
const data = await this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Get, callOpts);
this.receiveData(filter, data);
}
catch (e) {
this.unsubscribe(filter, subscriptionKey);
throw e;
}
}
catch (e) {
this.unsubscribe(filter, subscriptionKey);
throw e;
else {
// do not subscribe if already subscribed
try {
await this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Subscribe, callOpts);
}
catch (e) {
this.unsubscribe(filter, subscriptionKey);
throw e;
}
}

@@ -39,5 +54,10 @@ return subscriptionKey;

return;
// session.send and not session.callRemote because unsubscribe doesn't yield any response from the server side
this.session.send(rpc_1.MessageType.Unsubscribe, (0, utils_1.createMessageId)(), this.topicName, params);
const subscriptions = this.consumers[paramsKey];
if (subscriptionKey == null) {
if (subscriptions.length > 0) {
this.deleteAllSubscriptions(paramsKey);
this.session.send(rpc_1.MessageType.Unsubscribe, (0, utils_1.createMessageId)(), this.topicName, params);
}
return;
}
const idx = subscriptions.findIndex(s => s.subscriptionKey == subscriptionKey);

@@ -50,2 +70,3 @@ if (idx >= 0) {

this.deleteAllSubscriptions(paramsKey);
this.session.send(rpc_1.MessageType.Unsubscribe, (0, utils_1.createMessageId)(), this.topicName, params);
}

@@ -52,0 +73,0 @@ }

4

package.json
{
"name": "@push-rpc/core",
"version": "1.8.2",
"version": "1.8.3",
"main": "dist/index.js",

@@ -21,3 +21,3 @@ "types": "dist/index.d.ts",

},
"gitHead": "a8a80fd6b1683a00273d3889fbbdb14387d7edbc"
"gitHead": "4b2411a10a39ebe9a83b4939adfbde1a5618c1c3"
}

@@ -35,9 +35,29 @@ import {CallOptions, DataConsumer, LocalTopic, MessageType, RemoteTopic, TopicImpl} from "./rpc"

const alreadySubscribed = !!this.consumers[paramsKey]
this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), {consumer, subscriptionKey}]
try {
await this.session.callRemote(this.topicName, filter, MessageType.Subscribe, callOpts)
} catch (e) {
this.unsubscribe(filter, subscriptionKey)
throw e
if (alreadySubscribed) {
// refetch latest data
try {
const data = await this.session.callRemote(
this.topicName,
filter,
MessageType.Get,
callOpts
)
this.receiveData(filter, data)
} catch (e) {
this.unsubscribe(filter, subscriptionKey)
throw e
}
} else {
// do not subscribe if already subscribed
try {
await this.session.callRemote(this.topicName, filter, MessageType.Subscribe, callOpts)
} catch (e) {
this.unsubscribe(filter, subscriptionKey)
throw e
}
}

@@ -53,8 +73,16 @@

// session.send and not session.callRemote because unsubscribe doesn't yield any response from the server side
this.session.send(MessageType.Unsubscribe, createMessageId(), this.topicName, params)
const subscriptions = this.consumers[paramsKey]
if (subscriptionKey == null) {
if (subscriptions.length > 0) {
this.deleteAllSubscriptions(paramsKey)
this.session.send(MessageType.Unsubscribe, createMessageId(), this.topicName, params)
}
return
}
const idx = subscriptions.findIndex(s => s.subscriptionKey == subscriptionKey)
if (idx >= 0) {

@@ -65,2 +93,4 @@ if (subscriptions.length > 1) {

this.deleteAllSubscriptions(paramsKey)
this.session.send(MessageType.Unsubscribe, createMessageId(), this.topicName, params)
}

@@ -67,0 +97,0 @@ }

import {assert} from "chai"
import {startTestServer, TEST_PORT} from "./testUtils"
import {createTestClient, startTestServer, TEST_PORT} from "./testUtils"
import {createRpcClient, LocalTopicImpl} from "../src"

@@ -23,3 +23,3 @@ import {createNodeWebsocket} from "../../websocket/src"

await client.remote.item.subscribe(() => {}, {})
await client.remote.item.subscribe(() => {}, {}, "1")
await new Promise(r => setTimeout(r, 20))

@@ -29,8 +29,8 @@ assert.equal(1, Object.keys(services.item["subscriptions"]).length)

await client.remote.item.subscribe(() => {}, {})
await client.remote.item.subscribe(() => {}, {}, "2")
await new Promise(r => setTimeout(r, 20))
assert.equal(1, Object.keys(services.item["subscriptions"]).length)
assert.equal(2, Object.values(services.item["subscriptions"])[0].sessions.length)
// assert.equal(2, Object.values(services.item["subscriptions"])[0].sessions.length)
await client.remote.item.unsubscribe({})
await client.remote.item.unsubscribe({}, "1")
await new Promise(r => setTimeout(r, 100))

@@ -40,2 +40,31 @@ assert.equal(1, Object.keys(services.item["subscriptions"]).length)

await client.remote.item.unsubscribe({}, "2")
await new Promise(r => setTimeout(r, 100))
assert.equal(0, Object.keys(services.item["subscriptions"]).length)
})
it("unsubscribe all", async () => {
const services = {
item: new LocalTopicImpl(async () => {
return 1
}),
}
await startTestServer(services)
const client = await createRpcClient(
async () => createNodeWebsocket(`ws://localhost:${TEST_PORT}`),
{}
)
await client.remote.item.subscribe(() => {}, {}, "1")
await new Promise(r => setTimeout(r, 20))
assert.equal(1, Object.keys(services.item["subscriptions"]).length)
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length)
await client.remote.item.subscribe(() => {}, {}, "2")
await new Promise(r => setTimeout(r, 20))
assert.equal(1, Object.keys(services.item["subscriptions"]).length)
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length)
await client.remote.item.unsubscribe({})

@@ -60,3 +89,3 @@ await new Promise(r => setTimeout(r, 100))

await client.remote.item.subscribe(() => {}, {})
await client.remote.item.subscribe(() => {}, {}, "1")
await new Promise(r => setTimeout(r, 20))

@@ -69,5 +98,5 @@ assert.equal(1, Object.keys(services.item["subscriptions"]).length)

assert.equal(1, Object.keys(services.item["subscriptions"]).length)
assert.equal(2, Object.values(services.item["subscriptions"])[0].sessions.length)
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length)
await client.remote.item.unsubscribe({})
await client.remote.item.unsubscribe({}, "1")
await new Promise(r => setTimeout(r, 100))

@@ -123,2 +152,45 @@

})
it("double subscribe unsubscribe bug", async () => {
let delivered = null
const server = {
test: {
item: new LocalTopicImpl(async () => "ok"),
},
}
await startTestServer(server)
const client = await createTestClient()
await client.test.item.subscribe(
r => {
console.log("Got sub 1")
delivered = r
},
{},
"1"
)
await client.test.item.subscribe(
() => {
console.log("Got sub 2")
},
{},
"2"
)
assert.isOk(delivered)
delivered = null
await client.test.item.unsubscribe({}, "2")
console.log("Unsubscribe")
server.test.item.trigger()
console.log("Trigger")
await new Promise(r => setTimeout(r, 200))
assert.isOk(delivered)
delivered = null
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc