Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@silenteer/natsu-port-server

Package Overview
Dependencies
Maintainers
2
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@silenteer/natsu-port-server - npm Package Compare versions

Comparing version 0.0.9 to 0.0.10

23

cli.js

@@ -12,3 +12,4 @@ #!/usr/bin/env node

'--server-port': Number,
'--server-path': String,
'--server-http-path': String,
'--server-ws-path': String,
'--help': Boolean,

@@ -56,3 +57,3 @@ });

[
'--server-path',
'--server-http-path',
false,

@@ -63,2 +64,8 @@ '/',

[
'--server-ws-path',
false,
'/',
`It's an endpoint which server will listen to handle websocket request.\nThe request can ask server to subscribe or unsubscribe to a nats subject`,
],
[
'--config',

@@ -103,11 +110,11 @@ false,

}
if (args['--server-path']) {
process.env['SERVER_PATH'] = args['--server-path'];
if (args['--server-http-path']) {
process.env['SERVER_HTTP_PATH'] = args['--server-http-path'];
}
if (args['--server-ws-path']) {
process.env['SERVER_WS_PATH'] = args['--server-ws-path'];
}
}
const serverPath = path.join(
process.cwd(),
'node_modules/@silenteer/natsu-port-server/dist/index.js'
);
const serverPath = path.join(__dirname, 'dist/index.js');
if (!serverPath) {

@@ -114,0 +121,0 @@ throw new Error(`Not found entry file at ${serverPath}`);

@@ -7,3 +7,4 @@ declare type Config = {

natsPass: string;
path: string;
httpPath: string;
wsPath: string;
port: number;

@@ -10,0 +11,0 @@ };

@@ -15,3 +15,4 @@ "use strict";

natsPass: yup.string().trim().notRequired(),
path: yup.string(),
httpPath: yup.string(),
wsPath: yup.string(),
port: yup.number().lessThan(65000).moreThan(0),

@@ -26,3 +27,4 @@ });

port: parseInt(process.env.SERVER_PORT) || 8080,
path: process.env.SERVER_PATH || '/',
httpPath: process.env.SERVER_HTTP_PATH || '/',
wsPath: process.env.SERVER_WS_PATH || '/',
};

@@ -29,0 +31,0 @@ try {

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
const crypto_1 = require("crypto");
const yup = (0, tslib_1.__importStar)(require("yup"));

@@ -8,5 +9,6 @@ const nats_1 = require("nats");

const fastify_cors_1 = (0, tslib_1.__importDefault)(require("fastify-cors"));
const fastify_websocket_1 = (0, tslib_1.__importDefault)(require("fastify-websocket"));
const configuration_1 = (0, tslib_1.__importDefault)(require("./configuration"));
const service_nats_1 = (0, tslib_1.__importDefault)(require("./service-nats"));
const schema = yup.object({
const httpRequestSchema = yup.object({
subject: yup.string().trim().required(),

@@ -18,2 +20,8 @@ contentType: yup

});
const wsRequestSchema = yup.object({
subject: yup.string().trim().required(),
action: yup
.string()
.oneOf(['subscribe', 'unsubscribe']),
});
const requestCodec = (0, nats_1.JSONCodec)();

@@ -27,27 +35,17 @@ const responseCodec = (0, nats_1.JSONCodec)();

})
.post(configuration_1.default.path, (request, reply) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
var _a, _b;
.register(fastify_websocket_1.default)
.post(configuration_1.default.httpPath, (request, reply) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
try {
const contentType = request.headers['content-type'];
const subject = request.headers['nats-subject'];
if (!schema.isValidSync({ contentType, subject })) {
const validationResult = validateHttpRequest(request);
if (validationResult.code === 400) {
return400(reply);
return;
}
let natsAuthResponse;
const shouldAuthenticate = ((_a = configuration_1.default.natsAuthSubjects) === null || _a === void 0 ? void 0 : _a.length) > 0 &&
!((_b = configuration_1.default.natsNonAuthorizedSubjects) === null || _b === void 0 ? void 0 : _b.includes(subject));
if (shouldAuthenticate) {
natsAuthResponse = yield sendNatsAuthRequest(request);
if (natsAuthResponse.code !== 200) {
const response = {
code: natsAuthResponse.code,
};
reply.send(response);
return;
}
const authenticationResult = yield authenticate(request);
if (authenticationResult.code !== 'OK') {
reply.send(authenticationResult.authResponse);
}
const response = yield sendNatsRequest({
httpRequest: request,
natsAuthResponse,
natsAuthResponse: authenticationResult.authResponse,
});

@@ -66,2 +64,57 @@ reply.send(response);

}))
.get(configuration_1.default.wsPath, { websocket: true }, (connection, request) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
const connectionId = (0, crypto_1.randomUUID)();
connection.socket.on('close', () => service_nats_1.default.unsubscribeAllSubjects(connectionId));
connection.socket.on('message', (message) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
let wsRequest;
try {
wsRequest = JSON.parse(message.toString());
request.headers['nats-subject'] = wsRequest.subject;
const validationResult = validateWSRequest(wsRequest);
if (validationResult.code === 400) {
const response = {
subject: wsRequest.subject,
code: validationResult.code,
};
sendWSResponse({ connection, response });
return;
}
const authenticationResult = yield authenticate(request);
if (authenticationResult.code !== 'OK') {
service_nats_1.default.unsubscribe({
connectionId,
subject: wsRequest.subject,
});
connection.destroy(new Error(JSON.stringify({ code: authenticationResult.code })));
return;
}
if (wsRequest.action === 'subscribe') {
service_nats_1.default.subscribe({
connectionId,
subject: wsRequest.subject,
onHandle: (response) => {
sendWSResponse({ connection, response });
},
});
}
else if (wsRequest.action === 'unsubscribe') {
service_nats_1.default.unsubscribe({
connectionId,
subject: wsRequest.subject,
});
}
else {
connection.destroy(new Error('Unsupported operation'));
}
}
catch (error) {
const response = {
subject: wsRequest === null || wsRequest === void 0 ? void 0 : wsRequest.subject,
code: 500,
body: JSON.stringify(error),
};
sendWSResponse({ connection, response });
}
}));
}))
.listen(configuration_1.default.port, '0.0.0.0', (error, address) => {

@@ -75,2 +128,50 @@ if (error) {

}
function validateHttpRequest(request) {
const contentType = request.headers['content-type'];
const subject = request.headers['nats-subject'];
let result;
if (!httpRequestSchema.isValidSync({ contentType, subject })) {
result = { code: 400 };
return result;
}
result = { code: 'OK' };
return result;
}
function validateWSRequest(request) {
let result;
if (!wsRequestSchema.isValidSync(request)) {
result = { code: 400 };
return result;
}
result = { code: 'OK' };
return result;
}
function authenticate(request) {
var _a, _b;
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
let result;
const subject = request.headers['nats-subject'];
const shouldAuthenticate = ((_a = configuration_1.default.natsAuthSubjects) === null || _a === void 0 ? void 0 : _a.length) > 0 &&
!((_b = configuration_1.default.natsNonAuthorizedSubjects) === null || _b === void 0 ? void 0 : _b.includes(subject));
if (shouldAuthenticate) {
const natsAuthResponse = yield sendNatsAuthRequest(request);
if (natsAuthResponse.code !== 200) {
result = {
code: natsAuthResponse.code,
authResponse: natsAuthResponse,
};
return result;
}
else {
result = {
code: 'OK',
authResponse: natsAuthResponse,
};
return result;
}
}
result = { code: 'OK' };
return result;
});
}
function sendNatsAuthRequest(request) {

@@ -83,3 +184,6 @@ return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {

};
const message = yield service_nats_1.default.request(subject, requestCodec.encode(natsRequest));
const message = yield service_nats_1.default.request({
subject,
data: requestCodec.encode(natsRequest),
});
natsResponse = responseCodec.decode(message.data);

@@ -99,9 +203,12 @@ if (natsResponse.code !== 200) {

headers: natsAuthResponse ? natsAuthResponse.headers : httpRequest.headers,
body: encodeBody((_a = httpRequest.body) === null || _a === void 0 ? void 0 : _a.data),
body: service_nats_1.default.encodeBody((_a = httpRequest.body) === null || _a === void 0 ? void 0 : _a.data),
};
const message = yield service_nats_1.default.request(httpRequest.headers['nats-subject'], requestCodec.encode(natsRequest));
const message = yield service_nats_1.default.request({
subject: httpRequest.headers['nats-subject'],
data: requestCodec.encode(natsRequest),
});
const natsResponse = responseCodec.decode(message.data);
const portResponse = {
code: natsResponse.code,
body: decodeBody(natsResponse.body),
body: service_nats_1.default.decodeBody(natsResponse.body),
};

@@ -111,10 +218,9 @@ return portResponse;

}
function encodeBody(body) {
return body
? Buffer.from((0, nats_1.JSONCodec)().encode(body)).toString('base64')
: undefined;
function sendWSResponse(params) {
const { connection, response } = params;
console.log('Sending ', response);
if (response === null || response === void 0 ? void 0 : response.subject) {
connection.socket.send(JSON.stringify(response));
}
}
function decodeBody(body) {
return body ? (0, nats_1.JSONCodec)().decode(Buffer.from(body, 'base64')) : undefined;
}
function return400(reply) {

@@ -121,0 +227,0 @@ reply.statusCode = 400;

import type { RequestOptions } from 'nats';
declare function request(subject: string, data?: Uint8Array, options?: Partial<RequestOptions>): Promise<import("nats").Msg>;
import type { NatsPortWSResponse, NatsPortWSErrorResponse } from '@silenteer/natsu-type';
declare function request(params: {
subject: string;
data?: Uint8Array;
options?: Partial<RequestOptions>;
}): Promise<import("nats").Msg>;
declare function subscribe(params: {
connectionId: string;
subject: string;
onHandle: (response: NatsPortWSResponse | NatsPortWSErrorResponse) => void;
}): Promise<void>;
declare function unsubscribe(params: {
connectionId: string;
subject: string;
}): void;
declare function unsubscribeAllSubjects(connectionId: string): void;
declare function encodeBody(body: unknown): string;
declare function decodeBody(body: string): unknown;
declare const _default: {
request: typeof request;
subscribe: typeof subscribe;
unsubscribe: typeof unsubscribe;
unsubscribeAllSubjects: typeof unsubscribeAllSubjects;
encodeBody: typeof encodeBody;
decodeBody: typeof decodeBody;
};
export default _default;

@@ -6,2 +6,3 @@ "use strict";

const configuration_1 = (0, tslib_1.__importDefault)(require("./configuration"));
const subscriptions = {};
let natsConnection;

@@ -21,10 +22,98 @@ function getConnection() {

};
function request(subject, data, options) {
function request(params) {
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
const { subject, data, options } = params;
return (yield getConnection()).request(subject, data, Object.assign(Object.assign({}, defaultRequestOptions), options));
});
}
function subscribe(params) {
var _a, _b, _c;
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
const { connectionId, subject, onHandle } = params;
if ((_b = (_a = subscriptions[subject]) === null || _a === void 0 ? void 0 : _a.connections) === null || _b === void 0 ? void 0 : _b.some((item) => {
item.connectionId === connectionId;
})) {
return;
}
if (!((_c = subscriptions[subject]) === null || _c === void 0 ? void 0 : _c.subscription)) {
const subscription = (yield getConnection()).subscribe(subject);
subscriptions[subject] = { subscription, connections: [] };
}
subscriptions[subject].connections = [
...subscriptions[subject].connections,
{ connectionId, onHandle },
];
const codec = (0, nats_1.JSONCodec)();
(() => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () {
var e_1, _d;
try {
for (var _e = (0, tslib_1.__asyncValues)(subscriptions[subject].subscription), _f; _f = yield _e.next(), !_f.done;) {
const message = _f.value;
try {
const data = message.data ? codec.decode(message.data) : undefined;
if (data) {
subscriptions[subject].connections.forEach(({ onHandle }) => {
onHandle({
subject,
code: data.code,
body: decodeBody(data.body),
});
});
}
}
catch (error) {
console.error(error);
subscriptions[subject].connections.forEach(({ onHandle }) => {
onHandle({
subject,
code: 500,
});
});
}
}
}
catch (e_1_1) { e_1 = { error: e_1_1 }; }
finally {
try {
if (_f && !_f.done && (_d = _e.return)) yield _d.call(_e);
}
finally { if (e_1) throw e_1.error; }
}
}))();
});
}
function unsubscribe(params) {
const { connectionId, subject } = params;
if (!subscriptions[subject]) {
return;
}
subscriptions[subject].connections = subscriptions[subject].connections.filter((item) => item.connectionId !== connectionId);
if (subscriptions[subject].connections.length === 0) {
subscriptions[subject].subscription.unsubscribe();
delete subscriptions[subject];
}
}
function unsubscribeAllSubjects(connectionId) {
Object.entries(subscriptions).forEach(([subject, { connections }]) => {
if (connections.some((item) => item.connectionId === connectionId)) {
unsubscribe({ connectionId, subject });
}
});
}
function encodeBody(body) {
return body
? Buffer.from((0, nats_1.JSONCodec)().encode(body)).toString('base64')
: undefined;
}
function decodeBody(body) {
return body ? (0, nats_1.JSONCodec)().decode(Buffer.from(body, 'base64')) : undefined;
}
exports.default = {
request,
subscribe,
unsubscribe,
unsubscribeAllSubjects,
encodeBody,
decodeBody,
};
//# sourceMappingURL=service-nats.js.map
{
"name": "@silenteer/natsu-port-server",
"version": "0.0.9",
"version": "0.0.10",
"license": "MIT",

@@ -23,3 +23,3 @@ "private": false,

"dependencies": {
"@silenteer/natsu-type": "0.0.5",
"@silenteer/natsu-type": "0.0.8",
"arg": "5.0.1",

@@ -29,2 +29,3 @@ "cli-table3": "0.6.0",

"fastify-cors": "6.0.2",
"fastify-websocket": "4.0.0",
"nats": "2.2.0",

@@ -31,0 +32,0 @@ "yup": "0.32.9"

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc