@silenteer/natsu-port-server
Advanced tools
Comparing version 0.0.10 to 0.0.11
@@ -63,3 +63,5 @@ "use strict"; | ||
const connectionId = (0, crypto_1.randomUUID)(); | ||
connection.socket.on('close', () => service_nats_1.default.unsubscribeAllSubjects(connectionId)); | ||
connection.socket.on('close', () => { | ||
service_nats_1.default.unsubscribeAllSubjects(connectionId); | ||
}); | ||
connection.socket.on('message', (message) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
@@ -214,3 +216,2 @@ let wsRequest; | ||
const { connection, response } = params; | ||
console.log('Sending ', response); | ||
if (response === null || response === void 0 ? void 0 : response.subject) { | ||
@@ -217,0 +218,0 @@ connection.socket.send(JSON.stringify(response)); |
@@ -16,4 +16,3 @@ import type { RequestOptions } from 'nats'; | ||
subject: string; | ||
}): void; | ||
declare function unsubscribeAllSubjects(connectionId: string): void; | ||
}): Promise<void>; | ||
declare function encodeBody(body: unknown): string; | ||
@@ -23,5 +22,5 @@ declare function decodeBody(body: string): unknown; | ||
request: typeof request; | ||
subscribe: typeof subscribe; | ||
unsubscribe: typeof unsubscribe; | ||
unsubscribeAllSubjects: typeof unsubscribeAllSubjects; | ||
subscribe: (params: Parameters<typeof subscribe>[0]) => void; | ||
unsubscribe: (params: Parameters<typeof unsubscribe>[0]) => void; | ||
unsubscribeAllSubjects: (connectionId: string) => void; | ||
encodeBody: typeof encodeBody; | ||
@@ -28,0 +27,0 @@ decodeBody: typeof decodeBody; |
@@ -6,2 +6,25 @@ "use strict"; | ||
const configuration_1 = (0, tslib_1.__importDefault)(require("./configuration")); | ||
class Queue { | ||
constructor(_onProcess) { | ||
this._onProcess = _onProcess; | ||
this._queue = []; | ||
} | ||
add(params) { | ||
this._queue.unshift(params); | ||
if (!this._isProcessing) { | ||
this._process(); | ||
} | ||
} | ||
_process() { | ||
if (this._queue.length === 0) { | ||
return; | ||
} | ||
this._isProcessing = true; | ||
const params = this._queue.pop(); | ||
this._onProcess(params).then(() => { | ||
this._isProcessing = false; | ||
this._process(); | ||
}); | ||
} | ||
} | ||
const subscriptions = {}; | ||
@@ -32,10 +55,10 @@ let natsConnection; | ||
const { connectionId, subject, onHandle } = params; | ||
if ((_b = (_a = subscriptions[subject]) === null || _a === void 0 ? void 0 : _a.connections) === null || _b === void 0 ? void 0 : _b.some((item) => { | ||
item.connectionId === connectionId; | ||
})) { | ||
if ((_b = (_a = subscriptions[subject]) === null || _a === void 0 ? void 0 : _a.connections) === null || _b === void 0 ? void 0 : _b.some((item) => item.connectionId === connectionId)) { | ||
return; | ||
} | ||
let shouldSubscribe; | ||
if (!((_c = subscriptions[subject]) === null || _c === void 0 ? void 0 : _c.subscription)) { | ||
const subscription = (yield getConnection()).subscribe(subject); | ||
subscriptions[subject] = { subscription, connections: [] }; | ||
shouldSubscribe = true; | ||
} | ||
@@ -46,8 +69,12 @@ subscriptions[subject].connections = [ | ||
]; | ||
if (!shouldSubscribe) { | ||
return; | ||
} | ||
const codec = (0, nats_1.JSONCodec)(); | ||
(() => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
var e_1, _d; | ||
var _e, _f; | ||
try { | ||
for (var _e = (0, tslib_1.__asyncValues)(subscriptions[subject].subscription), _f; _f = yield _e.next(), !_f.done;) { | ||
const message = _f.value; | ||
for (var _g = (0, tslib_1.__asyncValues)(subscriptions[subject].subscription), _h; _h = yield _g.next(), !_h.done;) { | ||
const message = _h.value; | ||
try { | ||
@@ -67,3 +94,3 @@ const data = message.data ? codec.decode(message.data) : undefined; | ||
console.error(error); | ||
subscriptions[subject].connections.forEach(({ onHandle }) => { | ||
(_f = (_e = subscriptions[subject]) === null || _e === void 0 ? void 0 : _e.connections) === null || _f === void 0 ? void 0 : _f.forEach(({ onHandle }) => { | ||
onHandle({ | ||
@@ -80,3 +107,3 @@ subject, | ||
try { | ||
if (_f && !_f.done && (_d = _e.return)) yield _d.call(_e); | ||
if (_h && !_h.done && (_d = _g.return)) yield _d.call(_g); | ||
} | ||
@@ -89,17 +116,12 @@ finally { if (e_1) throw e_1.error; } | ||
function unsubscribe(params) { | ||
const { connectionId, subject } = params; | ||
if (!subscriptions[subject]) { | ||
return; | ||
} | ||
subscriptions[subject].connections = subscriptions[subject].connections.filter((item) => item.connectionId !== connectionId); | ||
if (subscriptions[subject].connections.length === 0) { | ||
subscriptions[subject].subscription.unsubscribe(); | ||
delete subscriptions[subject]; | ||
} | ||
} | ||
function unsubscribeAllSubjects(connectionId) { | ||
Object.entries(subscriptions).forEach(([subject, { connections }]) => { | ||
if (connections.some((item) => item.connectionId === connectionId)) { | ||
unsubscribe({ connectionId, subject }); | ||
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
const { connectionId, subject } = params; | ||
if (!subscriptions[subject]) { | ||
return; | ||
} | ||
subscriptions[subject].connections = subscriptions[subject].connections.filter((item) => item.connectionId !== connectionId); | ||
if (subscriptions[subject].connections.length === 0) { | ||
yield subscriptions[subject].subscription.drain(); | ||
delete subscriptions[subject]; | ||
} | ||
}); | ||
@@ -115,7 +137,15 @@ } | ||
} | ||
const subscriptionQueue = new Queue(subscribe); | ||
const unsubscriptionQueue = new Queue(unsubscribe); | ||
exports.default = { | ||
request, | ||
subscribe, | ||
unsubscribe, | ||
unsubscribeAllSubjects, | ||
subscribe: (params) => subscriptionQueue.add(params), | ||
unsubscribe: (params) => unsubscriptionQueue.add(params), | ||
unsubscribeAllSubjects: (connectionId) => { | ||
for (const [subject, { connections }] of Object.entries(subscriptions)) { | ||
if (connections.some((item) => item.connectionId === connectionId)) { | ||
unsubscriptionQueue.add({ connectionId, subject }); | ||
} | ||
} | ||
}, | ||
encodeBody, | ||
@@ -122,0 +152,0 @@ decodeBody, |
{ | ||
"name": "@silenteer/natsu-port-server", | ||
"version": "0.0.10", | ||
"version": "0.0.11", | ||
"license": "MIT", | ||
@@ -5,0 +5,0 @@ "private": false, |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
35164
575