@push-rpc/core
Advanced tools
Comparing version 1.8.1 to 1.8.2
@@ -68,3 +68,9 @@ "use strict"; | ||
this.subscriptions[key] = subscription; | ||
return await this.getData(filter, ctx, session.getConnectionContext()); | ||
try { | ||
return await this.getData(filter, ctx, session.getConnectionContext()); | ||
} | ||
catch (e) { | ||
this.unsubscribeSession(session, filter); | ||
throw e; | ||
} | ||
} | ||
@@ -71,0 +77,0 @@ unsubscribeSession(session, filter) { |
@@ -325,8 +325,7 @@ "use strict"; | ||
const ctx = this.createContext(messageId, topic.getTopicName()); | ||
// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list | ||
this.subscriptions.push({ topic, params }); | ||
this.listeners.subscribed(this.subscriptions.length); | ||
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx); | ||
const r = await this.localMiddleware(ctx, subscribeTopic, params, rpc_1.MessageType.Subscribe); | ||
this.send(rpc_1.MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r); | ||
this.subscriptions.push({ topic, params }); | ||
this.listeners.subscribed(this.subscriptions.length); | ||
} | ||
@@ -333,0 +332,0 @@ catch (e) { |
{ | ||
"name": "@push-rpc/core", | ||
"version": "1.8.1", | ||
"version": "1.8.2", | ||
"main": "dist/index.js", | ||
@@ -21,3 +21,3 @@ "types": "dist/index.d.ts", | ||
}, | ||
"gitHead": "f0936c94754ea6d3f4157be2ea40b4ce6c8ddfbb" | ||
"gitHead": "a8a80fd6b1683a00273d3889fbbdb14387d7edbc" | ||
} |
@@ -95,3 +95,8 @@ import {DataConsumer, DataSupplier, MessageType, Topic, TopicImpl} from "./rpc" | ||
return await this.getData(filter, ctx, session.getConnectionContext()) | ||
try { | ||
return await this.getData(filter, ctx, session.getConnectionContext()) | ||
} catch (e) { | ||
this.unsubscribeSession(session, filter) | ||
throw e | ||
} | ||
} | ||
@@ -98,0 +103,0 @@ |
@@ -424,6 +424,2 @@ import {LocalTopicImpl} from "./local" | ||
// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list | ||
this.subscriptions.push({topic, params}) | ||
this.listeners.subscribed(this.subscriptions.length) | ||
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx) | ||
@@ -433,2 +429,5 @@ const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe) | ||
this.send(MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r) | ||
this.subscriptions.push({topic, params}) | ||
this.listeners.subscribed(this.subscriptions.length) | ||
} catch (e) { | ||
@@ -435,0 +434,0 @@ log.error(`Unable to subscribe to topic ${topic.getTopicName()}`, e) |
@@ -118,43 +118,2 @@ import {assert} from "chai" | ||
}) | ||
it("exception in supplier leaves session referenced on unsubscribe", async () => { | ||
const services = { | ||
item: new LocalTopicImpl(async () => { | ||
throw new Error() | ||
}), | ||
} | ||
await startTestServer(services) | ||
let ws | ||
const client = await createRpcClient(async () => { | ||
ws = new WebSocket(`ws://localhost:${TEST_PORT}`) | ||
return wrapWebsocket(ws) | ||
}) | ||
client.remote.item | ||
.subscribe(() => {}, {}) | ||
.catch(e => { | ||
// ignored | ||
}) | ||
// pause the socket so that the server doesn't get the unsubscribe message | ||
ws.send = () => {} | ||
await new Promise(r => setTimeout(r, 20)) | ||
assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
const [session] = Object.values(services.item["subscriptions"])[0].sessions | ||
assert.equal(1, session.subscriptions.length) | ||
await client.disconnect() | ||
// time to cleanup | ||
await new Promise(r => setTimeout(r, 100)) | ||
assert.equal(0, Object.keys(services.item["subscriptions"]).length) | ||
}) | ||
}) |
@@ -6,3 +6,5 @@ import {assert} from "chai" | ||
import {createTestClient, startTestServer, TEST_PORT} from "./testUtils" | ||
import {createNodeWebsocket} from "../../websocket/src/server" | ||
import {createNodeWebsocket, wrapWebsocket} from "../../websocket/src/server" | ||
import {RpcSession} from "../src/RpcSession" | ||
import WebSocket from "ws" | ||
@@ -487,2 +489,36 @@ describe("Topics", () => { | ||
}) | ||
it("exception during subscribe do not create subscription", async () => { | ||
// throwing exception from supplier is an indication that topic do not want to subscribe this consumer | ||
const services = { | ||
item: new LocalTopicImpl(async () => { | ||
throw new Error() | ||
}), | ||
} | ||
const server = await startTestServer(services) | ||
let ws | ||
const client = await createRpcClient(async () => { | ||
ws = new WebSocket(`ws://localhost:${TEST_PORT}`) | ||
return wrapWebsocket(ws) | ||
}) | ||
client.remote.item | ||
.subscribe(() => {}, {}) | ||
.catch(e => { | ||
// ignored | ||
}) | ||
// pause the socket so that the server doesn't get the unsubscribe message | ||
ws.send = () => {} | ||
await new Promise(r => setTimeout(r, 20)) | ||
assert.equal(0, Object.keys(services.item["subscriptions"]).length) | ||
const session = Object.values(server["__sessions"])[0] as RpcSession | ||
assert.equal(0, session.subscriptions.length) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4390
151812