@push-rpc/core
Advanced tools
Comparing version 1.8.2 to 1.8.3
@@ -24,9 +24,24 @@ "use strict"; | ||
} | ||
const alreadySubscribed = !!this.consumers[paramsKey]; | ||
this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), { consumer, subscriptionKey }]; | ||
try { | ||
await this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Subscribe, callOpts); | ||
if (alreadySubscribed) { | ||
// refetch latest data | ||
try { | ||
const data = await this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Get, callOpts); | ||
this.receiveData(filter, data); | ||
} | ||
catch (e) { | ||
this.unsubscribe(filter, subscriptionKey); | ||
throw e; | ||
} | ||
} | ||
catch (e) { | ||
this.unsubscribe(filter, subscriptionKey); | ||
throw e; | ||
else { | ||
// do not subscribe if already subscribed | ||
try { | ||
await this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Subscribe, callOpts); | ||
} | ||
catch (e) { | ||
this.unsubscribe(filter, subscriptionKey); | ||
throw e; | ||
} | ||
} | ||
@@ -39,5 +54,10 @@ return subscriptionKey; | ||
return; | ||
// session.send and not session.callRemote because unsubscribe doesn't yield any response from the server side | ||
this.session.send(rpc_1.MessageType.Unsubscribe, (0, utils_1.createMessageId)(), this.topicName, params); | ||
const subscriptions = this.consumers[paramsKey]; | ||
if (subscriptionKey == null) { | ||
if (subscriptions.length > 0) { | ||
this.deleteAllSubscriptions(paramsKey); | ||
this.session.send(rpc_1.MessageType.Unsubscribe, (0, utils_1.createMessageId)(), this.topicName, params); | ||
} | ||
return; | ||
} | ||
const idx = subscriptions.findIndex(s => s.subscriptionKey == subscriptionKey); | ||
@@ -50,2 +70,3 @@ if (idx >= 0) { | ||
this.deleteAllSubscriptions(paramsKey); | ||
this.session.send(rpc_1.MessageType.Unsubscribe, (0, utils_1.createMessageId)(), this.topicName, params); | ||
} | ||
@@ -52,0 +73,0 @@ } |
{ | ||
"name": "@push-rpc/core", | ||
"version": "1.8.2", | ||
"version": "1.8.3", | ||
"main": "dist/index.js", | ||
@@ -21,3 +21,3 @@ "types": "dist/index.d.ts", | ||
}, | ||
"gitHead": "a8a80fd6b1683a00273d3889fbbdb14387d7edbc" | ||
"gitHead": "4b2411a10a39ebe9a83b4939adfbde1a5618c1c3" | ||
} |
@@ -35,9 +35,29 @@ import {CallOptions, DataConsumer, LocalTopic, MessageType, RemoteTopic, TopicImpl} from "./rpc" | ||
const alreadySubscribed = !!this.consumers[paramsKey] | ||
this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), {consumer, subscriptionKey}] | ||
try { | ||
await this.session.callRemote(this.topicName, filter, MessageType.Subscribe, callOpts) | ||
} catch (e) { | ||
this.unsubscribe(filter, subscriptionKey) | ||
throw e | ||
if (alreadySubscribed) { | ||
// refetch latest data | ||
try { | ||
const data = await this.session.callRemote( | ||
this.topicName, | ||
filter, | ||
MessageType.Get, | ||
callOpts | ||
) | ||
this.receiveData(filter, data) | ||
} catch (e) { | ||
this.unsubscribe(filter, subscriptionKey) | ||
throw e | ||
} | ||
} else { | ||
// do not subscribe if already subscribed | ||
try { | ||
await this.session.callRemote(this.topicName, filter, MessageType.Subscribe, callOpts) | ||
} catch (e) { | ||
this.unsubscribe(filter, subscriptionKey) | ||
throw e | ||
} | ||
} | ||
@@ -53,8 +73,16 @@ | ||
// session.send and not session.callRemote because unsubscribe doesn't yield any response from the server side | ||
this.session.send(MessageType.Unsubscribe, createMessageId(), this.topicName, params) | ||
const subscriptions = this.consumers[paramsKey] | ||
if (subscriptionKey == null) { | ||
if (subscriptions.length > 0) { | ||
this.deleteAllSubscriptions(paramsKey) | ||
this.session.send(MessageType.Unsubscribe, createMessageId(), this.topicName, params) | ||
} | ||
return | ||
} | ||
const idx = subscriptions.findIndex(s => s.subscriptionKey == subscriptionKey) | ||
if (idx >= 0) { | ||
@@ -65,2 +93,4 @@ if (subscriptions.length > 1) { | ||
this.deleteAllSubscriptions(paramsKey) | ||
this.session.send(MessageType.Unsubscribe, createMessageId(), this.topicName, params) | ||
} | ||
@@ -67,0 +97,0 @@ } |
import {assert} from "chai" | ||
import {startTestServer, TEST_PORT} from "./testUtils" | ||
import {createTestClient, startTestServer, TEST_PORT} from "./testUtils" | ||
import {createRpcClient, LocalTopicImpl} from "../src" | ||
@@ -23,3 +23,3 @@ import {createNodeWebsocket} from "../../websocket/src" | ||
await client.remote.item.subscribe(() => {}, {}) | ||
await client.remote.item.subscribe(() => {}, {}, "1") | ||
await new Promise(r => setTimeout(r, 20)) | ||
@@ -29,8 +29,8 @@ assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
await client.remote.item.subscribe(() => {}, {}) | ||
await client.remote.item.subscribe(() => {}, {}, "2") | ||
await new Promise(r => setTimeout(r, 20)) | ||
assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
assert.equal(2, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
// assert.equal(2, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
await client.remote.item.unsubscribe({}) | ||
await client.remote.item.unsubscribe({}, "1") | ||
await new Promise(r => setTimeout(r, 100)) | ||
@@ -40,2 +40,31 @@ assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
await client.remote.item.unsubscribe({}, "2") | ||
await new Promise(r => setTimeout(r, 100)) | ||
assert.equal(0, Object.keys(services.item["subscriptions"]).length) | ||
}) | ||
it("unsubscribe all", async () => { | ||
const services = { | ||
item: new LocalTopicImpl(async () => { | ||
return 1 | ||
}), | ||
} | ||
await startTestServer(services) | ||
const client = await createRpcClient( | ||
async () => createNodeWebsocket(`ws://localhost:${TEST_PORT}`), | ||
{} | ||
) | ||
await client.remote.item.subscribe(() => {}, {}, "1") | ||
await new Promise(r => setTimeout(r, 20)) | ||
assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
await client.remote.item.subscribe(() => {}, {}, "2") | ||
await new Promise(r => setTimeout(r, 20)) | ||
assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
await client.remote.item.unsubscribe({}) | ||
@@ -60,3 +89,3 @@ await new Promise(r => setTimeout(r, 100)) | ||
await client.remote.item.subscribe(() => {}, {}) | ||
await client.remote.item.subscribe(() => {}, {}, "1") | ||
await new Promise(r => setTimeout(r, 20)) | ||
@@ -69,5 +98,5 @@ assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
assert.equal(2, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
await client.remote.item.unsubscribe({}) | ||
await client.remote.item.unsubscribe({}, "1") | ||
await new Promise(r => setTimeout(r, 100)) | ||
@@ -123,2 +152,45 @@ | ||
}) | ||
it("double subscribe unsubscribe bug", async () => { | ||
let delivered = null | ||
const server = { | ||
test: { | ||
item: new LocalTopicImpl(async () => "ok"), | ||
}, | ||
} | ||
await startTestServer(server) | ||
const client = await createTestClient() | ||
await client.test.item.subscribe( | ||
r => { | ||
console.log("Got sub 1") | ||
delivered = r | ||
}, | ||
{}, | ||
"1" | ||
) | ||
await client.test.item.subscribe( | ||
() => { | ||
console.log("Got sub 2") | ||
}, | ||
{}, | ||
"2" | ||
) | ||
assert.isOk(delivered) | ||
delivered = null | ||
await client.test.item.unsubscribe({}, "2") | ||
console.log("Unsubscribe") | ||
server.test.item.trigger() | ||
console.log("Trigger") | ||
await new Promise(r => setTimeout(r, 200)) | ||
assert.isOk(delivered) | ||
delivered = null | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
155068
4492