@silenteer/natsu-port-server
Advanced tools
Comparing version 0.0.9 to 0.0.10
23
cli.js
@@ -12,3 +12,4 @@ #!/usr/bin/env node | ||
'--server-port': Number, | ||
'--server-path': String, | ||
'--server-http-path': String, | ||
'--server-ws-path': String, | ||
'--help': Boolean, | ||
@@ -56,3 +57,3 @@ }); | ||
[ | ||
'--server-path', | ||
'--server-http-path', | ||
false, | ||
@@ -63,2 +64,8 @@ '/', | ||
[ | ||
'--server-ws-path', | ||
false, | ||
'/', | ||
`It's an endpoint which server will listen to handle websocket request.\nThe request can ask server to subscribe or unsubscribe to a nats subject`, | ||
], | ||
[ | ||
'--config', | ||
@@ -103,11 +110,11 @@ false, | ||
} | ||
if (args['--server-path']) { | ||
process.env['SERVER_PATH'] = args['--server-path']; | ||
if (args['--server-http-path']) { | ||
process.env['SERVER_HTTP_PATH'] = args['--server-http-path']; | ||
} | ||
if (args['--server-ws-path']) { | ||
process.env['SERVER_WS_PATH'] = args['--server-ws-path']; | ||
} | ||
} | ||
const serverPath = path.join( | ||
process.cwd(), | ||
'node_modules/@silenteer/natsu-port-server/dist/index.js' | ||
); | ||
const serverPath = path.join(__dirname, 'dist/index.js'); | ||
if (!serverPath) { | ||
@@ -114,0 +121,0 @@ throw new Error(`Not found entry file at ${serverPath}`); |
@@ -7,3 +7,4 @@ declare type Config = { | ||
natsPass: string; | ||
path: string; | ||
httpPath: string; | ||
wsPath: string; | ||
port: number; | ||
@@ -10,0 +11,0 @@ }; |
@@ -15,3 +15,4 @@ "use strict"; | ||
natsPass: yup.string().trim().notRequired(), | ||
path: yup.string(), | ||
httpPath: yup.string(), | ||
wsPath: yup.string(), | ||
port: yup.number().lessThan(65000).moreThan(0), | ||
@@ -26,3 +27,4 @@ }); | ||
port: parseInt(process.env.SERVER_PORT) || 8080, | ||
path: process.env.SERVER_PATH || '/', | ||
httpPath: process.env.SERVER_HTTP_PATH || '/', | ||
wsPath: process.env.SERVER_WS_PATH || '/', | ||
}; | ||
@@ -29,0 +31,0 @@ try { |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const tslib_1 = require("tslib"); | ||
const crypto_1 = require("crypto"); | ||
const yup = (0, tslib_1.__importStar)(require("yup")); | ||
@@ -8,5 +9,6 @@ const nats_1 = require("nats"); | ||
const fastify_cors_1 = (0, tslib_1.__importDefault)(require("fastify-cors")); | ||
const fastify_websocket_1 = (0, tslib_1.__importDefault)(require("fastify-websocket")); | ||
const configuration_1 = (0, tslib_1.__importDefault)(require("./configuration")); | ||
const service_nats_1 = (0, tslib_1.__importDefault)(require("./service-nats")); | ||
const schema = yup.object({ | ||
const httpRequestSchema = yup.object({ | ||
subject: yup.string().trim().required(), | ||
@@ -18,2 +20,8 @@ contentType: yup | ||
}); | ||
const wsRequestSchema = yup.object({ | ||
subject: yup.string().trim().required(), | ||
action: yup | ||
.string() | ||
.oneOf(['subscribe', 'unsubscribe']), | ||
}); | ||
const requestCodec = (0, nats_1.JSONCodec)(); | ||
@@ -27,27 +35,17 @@ const responseCodec = (0, nats_1.JSONCodec)(); | ||
}) | ||
.post(configuration_1.default.path, (request, reply) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
var _a, _b; | ||
.register(fastify_websocket_1.default) | ||
.post(configuration_1.default.httpPath, (request, reply) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
try { | ||
const contentType = request.headers['content-type']; | ||
const subject = request.headers['nats-subject']; | ||
if (!schema.isValidSync({ contentType, subject })) { | ||
const validationResult = validateHttpRequest(request); | ||
if (validationResult.code === 400) { | ||
return400(reply); | ||
return; | ||
} | ||
let natsAuthResponse; | ||
const shouldAuthenticate = ((_a = configuration_1.default.natsAuthSubjects) === null || _a === void 0 ? void 0 : _a.length) > 0 && | ||
!((_b = configuration_1.default.natsNonAuthorizedSubjects) === null || _b === void 0 ? void 0 : _b.includes(subject)); | ||
if (shouldAuthenticate) { | ||
natsAuthResponse = yield sendNatsAuthRequest(request); | ||
if (natsAuthResponse.code !== 200) { | ||
const response = { | ||
code: natsAuthResponse.code, | ||
}; | ||
reply.send(response); | ||
return; | ||
} | ||
const authenticationResult = yield authenticate(request); | ||
if (authenticationResult.code !== 'OK') { | ||
reply.send(authenticationResult.authResponse); | ||
} | ||
const response = yield sendNatsRequest({ | ||
httpRequest: request, | ||
natsAuthResponse, | ||
natsAuthResponse: authenticationResult.authResponse, | ||
}); | ||
@@ -66,2 +64,57 @@ reply.send(response); | ||
})) | ||
.get(configuration_1.default.wsPath, { websocket: true }, (connection, request) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
const connectionId = (0, crypto_1.randomUUID)(); | ||
connection.socket.on('close', () => service_nats_1.default.unsubscribeAllSubjects(connectionId)); | ||
connection.socket.on('message', (message) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
let wsRequest; | ||
try { | ||
wsRequest = JSON.parse(message.toString()); | ||
request.headers['nats-subject'] = wsRequest.subject; | ||
const validationResult = validateWSRequest(wsRequest); | ||
if (validationResult.code === 400) { | ||
const response = { | ||
subject: wsRequest.subject, | ||
code: validationResult.code, | ||
}; | ||
sendWSResponse({ connection, response }); | ||
return; | ||
} | ||
const authenticationResult = yield authenticate(request); | ||
if (authenticationResult.code !== 'OK') { | ||
service_nats_1.default.unsubscribe({ | ||
connectionId, | ||
subject: wsRequest.subject, | ||
}); | ||
connection.destroy(new Error(JSON.stringify({ code: authenticationResult.code }))); | ||
return; | ||
} | ||
if (wsRequest.action === 'subscribe') { | ||
service_nats_1.default.subscribe({ | ||
connectionId, | ||
subject: wsRequest.subject, | ||
onHandle: (response) => { | ||
sendWSResponse({ connection, response }); | ||
}, | ||
}); | ||
} | ||
else if (wsRequest.action === 'unsubscribe') { | ||
service_nats_1.default.unsubscribe({ | ||
connectionId, | ||
subject: wsRequest.subject, | ||
}); | ||
} | ||
else { | ||
connection.destroy(new Error('Unsupported operation')); | ||
} | ||
} | ||
catch (error) { | ||
const response = { | ||
subject: wsRequest === null || wsRequest === void 0 ? void 0 : wsRequest.subject, | ||
code: 500, | ||
body: JSON.stringify(error), | ||
}; | ||
sendWSResponse({ connection, response }); | ||
} | ||
})); | ||
})) | ||
.listen(configuration_1.default.port, '0.0.0.0', (error, address) => { | ||
@@ -75,2 +128,50 @@ if (error) { | ||
} | ||
function validateHttpRequest(request) { | ||
const contentType = request.headers['content-type']; | ||
const subject = request.headers['nats-subject']; | ||
let result; | ||
if (!httpRequestSchema.isValidSync({ contentType, subject })) { | ||
result = { code: 400 }; | ||
return result; | ||
} | ||
result = { code: 'OK' }; | ||
return result; | ||
} | ||
function validateWSRequest(request) { | ||
let result; | ||
if (!wsRequestSchema.isValidSync(request)) { | ||
result = { code: 400 }; | ||
return result; | ||
} | ||
result = { code: 'OK' }; | ||
return result; | ||
} | ||
function authenticate(request) { | ||
var _a, _b; | ||
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
let result; | ||
const subject = request.headers['nats-subject']; | ||
const shouldAuthenticate = ((_a = configuration_1.default.natsAuthSubjects) === null || _a === void 0 ? void 0 : _a.length) > 0 && | ||
!((_b = configuration_1.default.natsNonAuthorizedSubjects) === null || _b === void 0 ? void 0 : _b.includes(subject)); | ||
if (shouldAuthenticate) { | ||
const natsAuthResponse = yield sendNatsAuthRequest(request); | ||
if (natsAuthResponse.code !== 200) { | ||
result = { | ||
code: natsAuthResponse.code, | ||
authResponse: natsAuthResponse, | ||
}; | ||
return result; | ||
} | ||
else { | ||
result = { | ||
code: 'OK', | ||
authResponse: natsAuthResponse, | ||
}; | ||
return result; | ||
} | ||
} | ||
result = { code: 'OK' }; | ||
return result; | ||
}); | ||
} | ||
function sendNatsAuthRequest(request) { | ||
@@ -83,3 +184,6 @@ return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
}; | ||
const message = yield service_nats_1.default.request(subject, requestCodec.encode(natsRequest)); | ||
const message = yield service_nats_1.default.request({ | ||
subject, | ||
data: requestCodec.encode(natsRequest), | ||
}); | ||
natsResponse = responseCodec.decode(message.data); | ||
@@ -99,9 +203,12 @@ if (natsResponse.code !== 200) { | ||
headers: natsAuthResponse ? natsAuthResponse.headers : httpRequest.headers, | ||
body: encodeBody((_a = httpRequest.body) === null || _a === void 0 ? void 0 : _a.data), | ||
body: service_nats_1.default.encodeBody((_a = httpRequest.body) === null || _a === void 0 ? void 0 : _a.data), | ||
}; | ||
const message = yield service_nats_1.default.request(httpRequest.headers['nats-subject'], requestCodec.encode(natsRequest)); | ||
const message = yield service_nats_1.default.request({ | ||
subject: httpRequest.headers['nats-subject'], | ||
data: requestCodec.encode(natsRequest), | ||
}); | ||
const natsResponse = responseCodec.decode(message.data); | ||
const portResponse = { | ||
code: natsResponse.code, | ||
body: decodeBody(natsResponse.body), | ||
body: service_nats_1.default.decodeBody(natsResponse.body), | ||
}; | ||
@@ -111,10 +218,9 @@ return portResponse; | ||
} | ||
function encodeBody(body) { | ||
return body | ||
? Buffer.from((0, nats_1.JSONCodec)().encode(body)).toString('base64') | ||
: undefined; | ||
function sendWSResponse(params) { | ||
const { connection, response } = params; | ||
console.log('Sending ', response); | ||
if (response === null || response === void 0 ? void 0 : response.subject) { | ||
connection.socket.send(JSON.stringify(response)); | ||
} | ||
} | ||
function decodeBody(body) { | ||
return body ? (0, nats_1.JSONCodec)().decode(Buffer.from(body, 'base64')) : undefined; | ||
} | ||
function return400(reply) { | ||
@@ -121,0 +227,0 @@ reply.statusCode = 400; |
import type { RequestOptions } from 'nats'; | ||
declare function request(subject: string, data?: Uint8Array, options?: Partial<RequestOptions>): Promise<import("nats").Msg>; | ||
import type { NatsPortWSResponse, NatsPortWSErrorResponse } from '@silenteer/natsu-type'; | ||
declare function request(params: { | ||
subject: string; | ||
data?: Uint8Array; | ||
options?: Partial<RequestOptions>; | ||
}): Promise<import("nats").Msg>; | ||
declare function subscribe(params: { | ||
connectionId: string; | ||
subject: string; | ||
onHandle: (response: NatsPortWSResponse | NatsPortWSErrorResponse) => void; | ||
}): Promise<void>; | ||
declare function unsubscribe(params: { | ||
connectionId: string; | ||
subject: string; | ||
}): void; | ||
declare function unsubscribeAllSubjects(connectionId: string): void; | ||
declare function encodeBody(body: unknown): string; | ||
declare function decodeBody(body: string): unknown; | ||
declare const _default: { | ||
request: typeof request; | ||
subscribe: typeof subscribe; | ||
unsubscribe: typeof unsubscribe; | ||
unsubscribeAllSubjects: typeof unsubscribeAllSubjects; | ||
encodeBody: typeof encodeBody; | ||
decodeBody: typeof decodeBody; | ||
}; | ||
export default _default; |
@@ -6,2 +6,3 @@ "use strict"; | ||
const configuration_1 = (0, tslib_1.__importDefault)(require("./configuration")); | ||
const subscriptions = {}; | ||
let natsConnection; | ||
@@ -21,10 +22,98 @@ function getConnection() { | ||
}; | ||
function request(subject, data, options) { | ||
function request(params) { | ||
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
const { subject, data, options } = params; | ||
return (yield getConnection()).request(subject, data, Object.assign(Object.assign({}, defaultRequestOptions), options)); | ||
}); | ||
} | ||
function subscribe(params) { | ||
var _a, _b, _c; | ||
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
const { connectionId, subject, onHandle } = params; | ||
if ((_b = (_a = subscriptions[subject]) === null || _a === void 0 ? void 0 : _a.connections) === null || _b === void 0 ? void 0 : _b.some((item) => { | ||
item.connectionId === connectionId; | ||
})) { | ||
return; | ||
} | ||
if (!((_c = subscriptions[subject]) === null || _c === void 0 ? void 0 : _c.subscription)) { | ||
const subscription = (yield getConnection()).subscribe(subject); | ||
subscriptions[subject] = { subscription, connections: [] }; | ||
} | ||
subscriptions[subject].connections = [ | ||
...subscriptions[subject].connections, | ||
{ connectionId, onHandle }, | ||
]; | ||
const codec = (0, nats_1.JSONCodec)(); | ||
(() => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
var e_1, _d; | ||
try { | ||
for (var _e = (0, tslib_1.__asyncValues)(subscriptions[subject].subscription), _f; _f = yield _e.next(), !_f.done;) { | ||
const message = _f.value; | ||
try { | ||
const data = message.data ? codec.decode(message.data) : undefined; | ||
if (data) { | ||
subscriptions[subject].connections.forEach(({ onHandle }) => { | ||
onHandle({ | ||
subject, | ||
code: data.code, | ||
body: decodeBody(data.body), | ||
}); | ||
}); | ||
} | ||
} | ||
catch (error) { | ||
console.error(error); | ||
subscriptions[subject].connections.forEach(({ onHandle }) => { | ||
onHandle({ | ||
subject, | ||
code: 500, | ||
}); | ||
}); | ||
} | ||
} | ||
} | ||
catch (e_1_1) { e_1 = { error: e_1_1 }; } | ||
finally { | ||
try { | ||
if (_f && !_f.done && (_d = _e.return)) yield _d.call(_e); | ||
} | ||
finally { if (e_1) throw e_1.error; } | ||
} | ||
}))(); | ||
}); | ||
} | ||
function unsubscribe(params) { | ||
const { connectionId, subject } = params; | ||
if (!subscriptions[subject]) { | ||
return; | ||
} | ||
subscriptions[subject].connections = subscriptions[subject].connections.filter((item) => item.connectionId !== connectionId); | ||
if (subscriptions[subject].connections.length === 0) { | ||
subscriptions[subject].subscription.unsubscribe(); | ||
delete subscriptions[subject]; | ||
} | ||
} | ||
function unsubscribeAllSubjects(connectionId) { | ||
Object.entries(subscriptions).forEach(([subject, { connections }]) => { | ||
if (connections.some((item) => item.connectionId === connectionId)) { | ||
unsubscribe({ connectionId, subject }); | ||
} | ||
}); | ||
} | ||
function encodeBody(body) { | ||
return body | ||
? Buffer.from((0, nats_1.JSONCodec)().encode(body)).toString('base64') | ||
: undefined; | ||
} | ||
function decodeBody(body) { | ||
return body ? (0, nats_1.JSONCodec)().decode(Buffer.from(body, 'base64')) : undefined; | ||
} | ||
exports.default = { | ||
request, | ||
subscribe, | ||
unsubscribe, | ||
unsubscribeAllSubjects, | ||
encodeBody, | ||
decodeBody, | ||
}; | ||
//# sourceMappingURL=service-nats.js.map |
{ | ||
"name": "@silenteer/natsu-port-server", | ||
"version": "0.0.9", | ||
"version": "0.0.10", | ||
"license": "MIT", | ||
@@ -23,3 +23,3 @@ "private": false, | ||
"dependencies": { | ||
"@silenteer/natsu-type": "0.0.5", | ||
"@silenteer/natsu-type": "0.0.8", | ||
"arg": "5.0.1", | ||
@@ -29,2 +29,3 @@ "cli-table3": "0.6.0", | ||
"fastify-cors": "6.0.2", | ||
"fastify-websocket": "4.0.0", | ||
"nats": "2.2.0", | ||
@@ -31,0 +32,0 @@ "yup": "0.32.9" |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
33227
545
8
20
+ Addedfastify-websocket@4.0.0
+ Added@silenteer/natsu-type@0.0.8(transitive)
+ Addedfastify-websocket@4.0.0(transitive)
+ Addedws@8.18.0(transitive)
- Removed@silenteer/natsu-type@0.0.5(transitive)
Updated@silenteer/natsu-type@0.0.8