@silenteer/natsu
Advanced tools
Comparing version 1.0.8 to 1.0.9
@@ -1,2 +0,3 @@ | ||
import type { NatsService, NatsHandler } from './type'; | ||
import type { NatsService } from './type'; | ||
import type { NatsHandler } from './type'; | ||
declare const _default: { | ||
@@ -9,2 +10,6 @@ setup: <TInjection extends Record<string, unknown>>(params: { | ||
verbose?: boolean; | ||
namespace?: { | ||
getNamespaceSubject: string; | ||
namespaceSubjects: string[]; | ||
}; | ||
}) => { | ||
@@ -11,0 +16,0 @@ start: () => Promise<void>; |
@@ -7,3 +7,9 @@ "use strict"; | ||
function start(params) { | ||
var _a; | ||
return (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
if (params.namespace && | ||
(!params.namespace.getNamespaceSubject || | ||
((_a = params.namespace.namespaceSubjects) === null || _a === void 0 ? void 0 : _a.length) === 0)) { | ||
throw new Error(`Wrong config for 'namespace' `); | ||
} | ||
const { urls, user, pass, verbose } = params; | ||
@@ -14,3 +20,3 @@ const key = getClientKey(urls); | ||
} | ||
if (!clients[key].client) { | ||
if (!clients[key].natsService) { | ||
const client = yield (0, nats_1.connect)({ | ||
@@ -24,7 +30,8 @@ servers: urls, | ||
}); | ||
clients[key] = Object.assign(Object.assign({}, clients[key]), { client }); | ||
clients[key] = Object.assign(Object.assign({}, clients[key]), { natsService: createNatsService({ client, namespace: params.namespace }) }); | ||
const requestCodec = (0, nats_1.JSONCodec)(); | ||
const responseCodec = (0, nats_1.JSONCodec)(); | ||
Object.entries(clients[key].handlers).forEach(([subject, handler]) => { | ||
const subcription = client.subscribe(subject); | ||
const natsService = clients[key].natsService; | ||
const subcription = natsService.subscribe(subject); | ||
(() => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
@@ -49,3 +56,4 @@ var e_1, _a; | ||
} | ||
const injection = Object.assign(Object.assign({}, params.injections), { message, natsService: client }); | ||
const injection = Object.assign(Object.assign({}, params.injections), { message, | ||
natsService }); | ||
if (handler.validate) { | ||
@@ -114,3 +122,3 @@ const validationResult = yield handler.validate(data, injection); | ||
if (clients[key]) { | ||
yield clients[key].client.drain(); | ||
yield clients[key].natsService.drain(); | ||
delete clients[key]; | ||
@@ -124,3 +132,3 @@ } | ||
const key = getClientKey(urls); | ||
const isStarted = !!((_a = clients[key]) === null || _a === void 0 ? void 0 : _a.client); | ||
const isStarted = !!((_a = clients[key]) === null || _a === void 0 ? void 0 : _a.natsService); | ||
if (isStarted) { | ||
@@ -131,3 +139,3 @@ throw new Error(`Can't register more handler after nats client started`); | ||
clients[key] = { | ||
client: undefined, | ||
natsService: undefined, | ||
handlers: {}, | ||
@@ -137,4 +145,5 @@ }; | ||
handlers.forEach((handler) => { | ||
if (!clients[key].handlers[handler.subject]) { | ||
clients[key].handlers[handler.subject] = handler; | ||
const { subject } = handler; | ||
if (!clients[key].handlers[subject]) { | ||
clients[key].handlers[subject] = handler; | ||
} | ||
@@ -146,2 +155,45 @@ }); | ||
} | ||
function createNatsService(params) { | ||
const { client } = params; | ||
const { getNamespaceSubject, namespaceSubjects } = params.namespace || {}; | ||
return { | ||
request: (subject, data, opts) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
return client.request(subject, (0, nats_1.JSONCodec)().encode(data), opts); | ||
}), | ||
publish: (subject, data, opts) => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
const shouldSetNamespace = getNamespaceSubject && (namespaceSubjects === null || namespaceSubjects === void 0 ? void 0 : namespaceSubjects.includes(subject)); | ||
let _subject = subject; | ||
if (shouldSetNamespace) { | ||
try { | ||
const { headers } = data || {}; | ||
const natsRequest = { | ||
headers, | ||
body: encodeBody({ subject }), | ||
}; | ||
const message = yield client.request(getNamespaceSubject, (0, nats_1.JSONCodec)().encode(natsRequest)); | ||
const natsResponse = (0, nats_1.JSONCodec)().decode(message.data); | ||
const { namespace } = (decodeBody(natsResponse.body) || | ||
{}); | ||
if (namespace) { | ||
_subject = `${subject}.${namespace}`; | ||
} | ||
else { | ||
throw new Error(`Namespace is required for subject: ${subject}`); | ||
} | ||
} | ||
catch (error) { | ||
console.error(`Get namespace failed for subject: ${subject}`); | ||
throw error; | ||
} | ||
} | ||
return client.publish(_subject, (0, nats_1.JSONCodec)().encode(data), opts); | ||
}), | ||
subscribe: (subject, opts) => { | ||
return client.subscribe(subject, opts); | ||
}, | ||
drain: () => (0, tslib_1.__awaiter)(this, void 0, void 0, function* () { | ||
return client.drain(); | ||
}), | ||
}; | ||
} | ||
function respond(params) { | ||
@@ -163,5 +215,5 @@ const { message, data } = params; | ||
setup: (params) => { | ||
const { urls, injections, user, pass, verbose } = params; | ||
const { urls, injections, user, pass, verbose, namespace } = params; | ||
const client = { | ||
start: () => start({ urls, injections, user, pass, verbose }), | ||
start: () => start({ urls, injections, user, pass, verbose, namespace }), | ||
stop: () => stop(urls), | ||
@@ -168,0 +220,0 @@ register: (handlers) => register({ urls, handlers }), |
@@ -1,6 +0,11 @@ | ||
import type { Msg, NatsConnection } from 'nats'; | ||
import type { Msg, NatsConnection, PublishOptions, RequestOptions } from 'nats'; | ||
import type { NatsService, NatsRequest, NatsResponse } from '@silenteer/natsu-type'; | ||
declare type NatsInjection = { | ||
message: Msg; | ||
natsService: NatsConnection; | ||
natsService: { | ||
request: (subject: string, data?: NatsRequest, opts?: RequestOptions) => Promise<Msg>; | ||
publish: (subject: string, data?: NatsResponse, opts?: PublishOptions) => Promise<void>; | ||
subscribe: NatsConnection['subscribe']; | ||
drain: NatsConnection['drain']; | ||
}; | ||
}; | ||
@@ -27,3 +32,3 @@ declare type NatsHandleResult<TBody> = { | ||
declare type NatsHandler<TService extends NatsService<string, unknown, unknown>, TInjection extends Record<string, unknown> = Record<string, unknown>> = { | ||
subject: string; | ||
subject: TService['subject']; | ||
validate: NatsValidate<TService, TInjection>; | ||
@@ -30,0 +35,0 @@ authorize: NatsAuthorize<TService, TInjection>; |
{ | ||
"name": "@silenteer/natsu", | ||
"version": "1.0.8", | ||
"version": "1.0.9", | ||
"license": "MIT", | ||
@@ -19,3 +19,3 @@ "private": false, | ||
"dependencies": { | ||
"@silenteer/natsu-type": "0.0.5", | ||
"@silenteer/natsu-type": "0.0.10", | ||
"nats": "^2.2.0" | ||
@@ -22,0 +22,0 @@ }, |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
28814
22
408
+ Added@silenteer/natsu-type@0.0.10(transitive)
- Removed@silenteer/natsu-type@0.0.5(transitive)
Updated@silenteer/natsu-type@0.0.10