Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@silenteer/natsu-port-server

Package Overview
Dependencies
Maintainers
2
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@silenteer/natsu-port-server - npm Package Compare versions

Comparing version 0.0.10 to 0.0.11

5

dist/index.js

@@ -63,3 +63,5 @@ "use strict";

const connectionId = (0, crypto_1.randomUUID)();
connection.socket.on('close', () => service_nats_1.default.unsubscribeAllSubjects(connectionId));
connection.socket.on('close', () => {
service_nats_1.default.unsubscribeAllSubjects(connectionId);
});
connection.socket.on('message', (message) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {

@@ -214,3 +216,2 @@ let wsRequest;

const { connection, response } = params;
console.log('Sending ', response);
if (response === null || response === void 0 ? void 0 : response.subject) {

@@ -217,0 +218,0 @@ connection.socket.send(JSON.stringify(response));

9

dist/service-nats.d.ts

@@ -16,4 +16,3 @@ import type { RequestOptions } from 'nats';

subject: string;
}): void;
declare function unsubscribeAllSubjects(connectionId: string): void;
}): Promise<void>;
declare function encodeBody(body: unknown): string;

@@ -23,5 +22,5 @@ declare function decodeBody(body: string): unknown;

request: typeof request;
subscribe: typeof subscribe;
unsubscribe: typeof unsubscribe;
unsubscribeAllSubjects: typeof unsubscribeAllSubjects;
subscribe: (params: Parameters<typeof subscribe>[0]) => void;
unsubscribe: (params: Parameters<typeof unsubscribe>[0]) => void;
unsubscribeAllSubjects: (connectionId: string) => void;
encodeBody: typeof encodeBody;

@@ -28,0 +27,0 @@ decodeBody: typeof decodeBody;

@@ -6,2 +6,25 @@ "use strict";

const configuration_1 = (0, tslib_1.__importDefault)(require("./configuration"));
class Queue {
constructor(_onProcess) {
this._onProcess = _onProcess;
this._queue = [];
}
add(params) {
this._queue.unshift(params);
if (!this._isProcessing) {
this._process();
}
}
_process() {
if (this._queue.length === 0) {
return;
}
this._isProcessing = true;
const params = this._queue.pop();
this._onProcess(params).then(() => {
this._isProcessing = false;
this._process();
});
}
}
const subscriptions = {};

@@ -32,10 +55,10 @@ let natsConnection;

const { connectionId, subject, onHandle } = params;
if ((_b = (_a = subscriptions[subject]) === null || _a === void 0 ? void 0 : _a.connections) === null || _b === void 0 ? void 0 : _b.some((item) => {
item.connectionId === connectionId;
})) {
if ((_b = (_a = subscriptions[subject]) === null || _a === void 0 ? void 0 : _a.connections) === null || _b === void 0 ? void 0 : _b.some((item) => item.connectionId === connectionId)) {
return;
}
let shouldSubscribe;
if (!((_c = subscriptions[subject]) === null || _c === void 0 ? void 0 : _c.subscription)) {
const subscription = (yield getConnection()).subscribe(subject);
subscriptions[subject] = { subscription, connections: [] };
shouldSubscribe = true;
}

@@ -46,8 +69,12 @@ subscriptions[subject].connections = [

];
if (!shouldSubscribe) {
return;
}
const codec = (0, nats_1.JSONCodec)();
(() => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
var e_1, _d;
var _e, _f;
try {
for (var _e = (0, tslib_1.__asyncValues)(subscriptions[subject].subscription), _f; _f = yield _e.next(), !_f.done;) {
const message = _f.value;
for (var _g = (0, tslib_1.__asyncValues)(subscriptions[subject].subscription), _h; _h = yield _g.next(), !_h.done;) {
const message = _h.value;
try {

@@ -67,3 +94,3 @@ const data = message.data ? codec.decode(message.data) : undefined;

console.error(error);
subscriptions[subject].connections.forEach(({ onHandle }) => {
(_f = (_e = subscriptions[subject]) === null || _e === void 0 ? void 0 : _e.connections) === null || _f === void 0 ? void 0 : _f.forEach(({ onHandle }) => {
onHandle({

@@ -80,3 +107,3 @@ subject,

try {
if (_f && !_f.done && (_d = _e.return)) yield _d.call(_e);
if (_h && !_h.done && (_d = _g.return)) yield _d.call(_g);
}

@@ -89,17 +116,12 @@ finally { if (e_1) throw e_1.error; }

function unsubscribe(params) {
const { connectionId, subject } = params;
if (!subscriptions[subject]) {
return;
}
subscriptions[subject].connections = subscriptions[subject].connections.filter((item) => item.connectionId !== connectionId);
if (subscriptions[subject].connections.length === 0) {
subscriptions[subject].subscription.unsubscribe();
delete subscriptions[subject];
}
}
function unsubscribeAllSubjects(connectionId) {
Object.entries(subscriptions).forEach(([subject, { connections }]) => {
if (connections.some((item) => item.connectionId === connectionId)) {
unsubscribe({ connectionId, subject });
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
const { connectionId, subject } = params;
if (!subscriptions[subject]) {
return;
}
subscriptions[subject].connections = subscriptions[subject].connections.filter((item) => item.connectionId !== connectionId);
if (subscriptions[subject].connections.length === 0) {
yield subscriptions[subject].subscription.drain();
delete subscriptions[subject];
}
});

@@ -115,7 +137,15 @@ }

}
const subscriptionQueue = new Queue(subscribe);
const unsubscriptionQueue = new Queue(unsubscribe);
exports.default = {
request,
subscribe,
unsubscribe,
unsubscribeAllSubjects,
subscribe: (params) => subscriptionQueue.add(params),
unsubscribe: (params) => unsubscriptionQueue.add(params),
unsubscribeAllSubjects: (connectionId) => {
for (const [subject, { connections }] of Object.entries(subscriptions)) {
if (connections.some((item) => item.connectionId === connectionId)) {
unsubscriptionQueue.add({ connectionId, subject });
}
}
},
encodeBody,

@@ -122,0 +152,0 @@ decodeBody,

{
"name": "@silenteer/natsu-port-server",
"version": "0.0.10",
"version": "0.0.11",
"license": "MIT",

@@ -5,0 +5,0 @@ "private": false,

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc