@push-rpc/core
Advanced tools
Comparing version 1.8.0 to 1.8.1
@@ -76,3 +76,5 @@ "use strict"; | ||
const index = subscription.sessions.indexOf(session); | ||
subscription.sessions.splice(index, 1); | ||
if (index >= 0) { | ||
subscription.sessions.splice(index, 1); | ||
} | ||
if (!subscription.sessions.length) { | ||
@@ -79,0 +81,0 @@ delete this.subscriptions[key]; |
@@ -0,1 +1,2 @@ | ||
import { LocalTopicImpl } from "./local"; | ||
import { CallOptions, MessageType, Middleware, RpcConnectionContext, RpcContext } from "./rpc"; | ||
@@ -24,4 +25,4 @@ import { Socket } from "./transport"; | ||
subscriptions: { | ||
topic: any; | ||
params: any; | ||
topic: LocalTopicImpl<unknown, unknown>; | ||
params: unknown; | ||
}[]; | ||
@@ -28,0 +29,0 @@ lastMessageAt: number; |
@@ -325,7 +325,8 @@ "use strict"; | ||
const ctx = this.createContext(messageId, topic.getTopicName()); | ||
// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list | ||
this.subscriptions.push({ topic, params }); | ||
this.listeners.subscribed(this.subscriptions.length); | ||
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx); | ||
const r = await this.localMiddleware(ctx, subscribeTopic, params, rpc_1.MessageType.Subscribe); | ||
this.send(rpc_1.MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r); | ||
this.subscriptions.push({ topic, params }); | ||
this.listeners.subscribed(this.subscriptions.length); | ||
} | ||
@@ -332,0 +333,0 @@ catch (e) { |
{ | ||
"name": "@push-rpc/core", | ||
"version": "1.8.0", | ||
"version": "1.8.1", | ||
"main": "dist/index.js", | ||
@@ -21,3 +21,3 @@ "types": "dist/index.d.ts", | ||
}, | ||
"gitHead": "8a5ba0d8a336a598746903ab25c1601e5033a765" | ||
"gitHead": "f0936c94754ea6d3f4157be2ea40b4ce6c8ddfbb" | ||
} |
@@ -105,3 +105,5 @@ import {DataConsumer, DataSupplier, MessageType, Topic, TopicImpl} from "./rpc" | ||
const index = subscription.sessions.indexOf(session) | ||
subscription.sessions.splice(index, 1) | ||
if (index >= 0) { | ||
subscription.sessions.splice(index, 1) | ||
} | ||
@@ -108,0 +110,0 @@ if (!subscription.sessions.length) { |
@@ -42,3 +42,3 @@ import {LocalTopicImpl} from "./local" | ||
public remote: any | ||
public subscriptions: {topic; params}[] = [] | ||
public subscriptions: {topic: LocalTopicImpl<unknown, unknown>; params: unknown}[] = [] | ||
public lastMessageAt: number | ||
@@ -425,2 +425,6 @@ | ||
// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list | ||
this.subscriptions.push({topic, params}) | ||
this.listeners.subscribed(this.subscriptions.length) | ||
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx) | ||
@@ -430,5 +434,2 @@ const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe) | ||
this.send(MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r) | ||
this.subscriptions.push({topic, params}) | ||
this.listeners.subscribed(this.subscriptions.length) | ||
} catch (e) { | ||
@@ -435,0 +436,0 @@ log.error(`Unable to subscribe to topic ${topic.getTopicName()}`, e) |
@@ -5,2 +5,4 @@ import {assert} from "chai" | ||
import {createNodeWebsocket} from "../../websocket/src" | ||
import {wrapWebsocket} from "../../websocket/src/server" | ||
import WebSocket from "ws" | ||
@@ -117,2 +119,43 @@ describe("Topic bugs", () => { | ||
}) | ||
it("exception in supplier leaves session referenced on unsubscribe", async () => { | ||
const services = { | ||
item: new LocalTopicImpl(async () => { | ||
throw new Error() | ||
}), | ||
} | ||
await startTestServer(services) | ||
let ws | ||
const client = await createRpcClient(async () => { | ||
ws = new WebSocket(`ws://localhost:${TEST_PORT}`) | ||
return wrapWebsocket(ws) | ||
}) | ||
client.remote.item | ||
.subscribe(() => {}, {}) | ||
.catch(e => { | ||
// ignored | ||
}) | ||
// pause the socket so that the server doesn't get the unsubscribe message | ||
ws.send = () => {} | ||
await new Promise(r => setTimeout(r, 20)) | ||
assert.equal(1, Object.keys(services.item["subscriptions"]).length) | ||
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length) | ||
const [session] = Object.values(services.item["subscriptions"])[0].sessions | ||
assert.equal(1, session.subscriptions.length) | ||
await client.disconnect() | ||
// time to cleanup | ||
await new Promise(r => setTimeout(r, 100)) | ||
assert.equal(0, Object.keys(services.item["subscriptions"]).length) | ||
}) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
151864
4383