nats-hemera
Advanced tools
Comparing version 6.1.1 to 7.0.0
116
index.d.ts
@@ -334,3 +334,3 @@ /// <reference types="node" /> | ||
ext( | ||
name: 'onClientPreRequest', | ||
name: 'onAct', | ||
handler: ( | ||
@@ -342,3 +342,3 @@ instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse>, | ||
ext( | ||
name: 'onClientPreRequest', | ||
name: 'onAct', | ||
handler: ( | ||
@@ -350,3 +350,3 @@ instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
ext( | ||
name: 'onClientPostRequest', | ||
name: 'onActFinished', | ||
handler: ( | ||
@@ -358,3 +358,3 @@ instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse>, | ||
ext( | ||
name: 'onClientPostRequest', | ||
name: 'onActFinished', | ||
handler: ( | ||
@@ -365,34 +365,24 @@ instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
// server extensions | ||
ext( | ||
name: 'onClientPreRequest', | ||
name: 'preHandler', | ||
handler: ( | ||
instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse>, | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
request: Hemera.ServerRequest, | ||
reply: Hemera.Reply, | ||
next: Hemera.NodeCallback | ||
) => void | ||
): Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
): Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
ext( | ||
name: 'onClientPreRequest', | ||
name: 'preHandler', | ||
handler: ( | ||
instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
request: Hemera.ServerRequest, | ||
reply: Hemera.Reply | ||
) => Promise<void> | ||
): Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
): Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
ext( | ||
name: 'onClientPostRequest', | ||
name: 'onRequest', | ||
handler: ( | ||
instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse>, | ||
next: Hemera.NodeCallback | ||
) => void | ||
): Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
ext( | ||
name: 'onClientPostRequest', | ||
handler: ( | ||
instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
) => Promise<void> | ||
): Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
// server extensions | ||
ext( | ||
name: 'onServerPreHandler', | ||
handler: ( | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
@@ -405,3 +395,3 @@ request: Hemera.ServerRequest, | ||
ext( | ||
name: 'onServerPreHandler', | ||
name: 'onRequest', | ||
handler: ( | ||
@@ -415,3 +405,3 @@ instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
ext( | ||
name: 'onServerPreRequest', | ||
name: 'onSend', | ||
handler: ( | ||
@@ -421,7 +411,7 @@ instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
reply: Hemera.Reply, | ||
next: Hemera.NodeCallback | ||
next: (err?: Error) => void | ||
) => void | ||
): Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
ext( | ||
name: 'onServerPreRequest', | ||
name: 'onSend', | ||
handler: ( | ||
@@ -435,6 +425,5 @@ instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
ext( | ||
name: 'onServerPreResponse', | ||
name: 'onResponse', | ||
handler: ( | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
request: Hemera.ServerRequest, | ||
reply: Hemera.Reply, | ||
@@ -445,6 +434,5 @@ next: (err?: Error) => void | ||
ext( | ||
name: 'onServerPreResponse', | ||
name: 'onResponse', | ||
handler: ( | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
request: Hemera.ServerRequest, | ||
reply: Hemera.Reply | ||
@@ -454,52 +442,19 @@ ) => Promise<void> | ||
// events | ||
on( | ||
event: 'clientPreRequest', | ||
ext( | ||
name: 'onError', | ||
handler: ( | ||
this: Hemera<Hemera.ClientRequest, Hemera.ClientResponse>, | ||
instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
payload: any, | ||
error: Error, | ||
next: (err?: Error) => void | ||
) => void | ||
): Hemera<Request, Response> | ||
on( | ||
event: 'clientPostRequest', | ||
): Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
ext( | ||
name: 'onResponse', | ||
handler: ( | ||
this: Hemera<Hemera.ClientRequest, Hemera.ClientResponse>, | ||
instance: Hemera<Hemera.ClientRequest, Hemera.ClientResponse> | ||
) => void | ||
): Hemera<Request, Response> | ||
on( | ||
event: 'serverPreHandler', | ||
handler: ( | ||
this: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
) => void | ||
): Hemera<Request, Response> | ||
on( | ||
event: 'serverPreRequest', | ||
handler: ( | ||
this: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
) => void | ||
): Hemera<Request, Response> | ||
on( | ||
event: 'serverPreResponse', | ||
handler: ( | ||
this: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
) => void | ||
): Hemera<Request, Response> | ||
on( | ||
event: 'serverResponseError', | ||
handler: ( | ||
this: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
instance: Hemera<Hemera.ServerRequest, Hemera.ServerResponse>, | ||
payload: any, | ||
error: Error | ||
) => void | ||
): void | ||
on( | ||
event: 'clientResponseError', | ||
handler: ( | ||
this: Hemera<Hemera.ClientRequest, Hemera.ClientResponse>, | ||
error: Error | ||
) => void | ||
): void | ||
) => Promise<void> | ||
): Hemera<Hemera.ServerRequest, Hemera.ServerResponse> | ||
@@ -540,2 +495,3 @@ ready(): Promise<void> | ||
setNotFoundPattern(pattern: string | Hemera.ServerPattern | null): void | ||
setErrorHandler(handler: (err: Error) => void): void | ||
@@ -542,0 +498,0 @@ setIdGenerator( |
@@ -12,10 +12,5 @@ 'use strict' | ||
const runExt = require('./extensionRunner').extRunner | ||
const { extRunner } = require('./extensionRunner') | ||
const { sAddReceivedMsg } = require('./symbols') | ||
/** | ||
* | ||
* | ||
* @export | ||
* @class Add | ||
*/ | ||
class Add { | ||
@@ -30,12 +25,5 @@ constructor(addDef) { | ||
// only used for maxMessages$ flag | ||
this._receivedMsg = 0 | ||
this[sAddReceivedMsg] = 0 | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} handler | ||
* | ||
* @memberof Add | ||
*/ | ||
_use(handler) { | ||
@@ -45,10 +33,2 @@ this.middleware.push(handler) | ||
/** | ||
* | ||
* | ||
* @param {any} handler | ||
* @returns | ||
* | ||
* @memberOf Add | ||
*/ | ||
use(handler) { | ||
@@ -64,9 +44,2 @@ if (Array.isArray(handler)) { | ||
/** | ||
* | ||
* | ||
* @param {any} cb | ||
* | ||
* @memberOf Add | ||
*/ | ||
end(cb) { | ||
@@ -76,18 +49,4 @@ this.action = cb | ||
/** | ||
* | ||
* | ||
* @param {any} request | ||
* @param {any} response | ||
* @param {any} cb | ||
* | ||
* @memberof Add | ||
*/ | ||
run(request, response, cb) { | ||
runExt( | ||
this.middleware, | ||
(fn, state, next) => fn(request, response, next), | ||
null, | ||
cb | ||
) | ||
extRunner(this.middleware, (fn, state, next) => fn(request, response, next), null, cb) | ||
} | ||
@@ -94,0 +53,0 @@ } |
const Joi = require('joi') | ||
const Os = require('os') | ||
const Stream = require('stream').Stream | ||
const { Stream } = require('stream') | ||
const Util = require('./util') | ||
@@ -14,3 +14,3 @@ | ||
.integer() | ||
.default(0), | ||
.default(10), | ||
tag: Joi.string().default(''), | ||
@@ -17,0 +17,0 @@ // Enables pretty log formatter in Pino default logger |
@@ -14,33 +14,19 @@ 'use strict' | ||
/** | ||
* @class ExtensionManager | ||
*/ | ||
class ExtensionManager { | ||
constructor() { | ||
this._stack = [] | ||
this._types = [ | ||
'onClientPreRequest', | ||
'onClientPostRequest', | ||
'onServerPreHandler', | ||
'onServerPreRequest', | ||
'onServerPreResponse' | ||
] | ||
this.onClientPreRequest = [] | ||
this.onClientPostRequest = [] | ||
this._types = ['onAct', 'onActFinished', 'preHandler', 'onRequest', 'onSend', 'onResponse', 'onError'] | ||
this.onAct = [] | ||
this.onActFinished = [] | ||
this.onServerPreHandler = [] | ||
this.onServerPreRequest = [] | ||
this.onServerPreResponse = [] | ||
this.preHandler = [] | ||
this.onRequest = [] | ||
this.onResponse = [] | ||
this.onSend = [] | ||
this.onError = [] | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} handler | ||
* | ||
* @memberof Extension | ||
*/ | ||
_add(type, handler) { | ||
if (this._types.indexOf(type) === -1) { | ||
let error = new Errors.HemeraError('Extension type is unknown', { | ||
const error = new Errors.HemeraError('Extension type is unknown', { | ||
type, | ||
@@ -55,9 +41,2 @@ handler | ||
/** | ||
* | ||
* | ||
* @param {any} handler | ||
* | ||
* @memberOf Extension | ||
*/ | ||
add(type, handler) { | ||
@@ -71,14 +50,12 @@ if (Array.isArray(handler)) { | ||
/** | ||
* | ||
* @param {*} e | ||
*/ | ||
static build(e) { | ||
const extensions = new ExtensionManager() | ||
extensions.onClientPreRequest = e.onClientPreRequest.slice() | ||
extensions.onClientPostRequest = e.onClientPostRequest.slice() | ||
extensions.onAct = e.onAct.slice() | ||
extensions.onActFinished = e.onActFinished.slice() | ||
extensions.onServerPreHandler = e.onServerPreHandler.slice() | ||
extensions.onServerPreRequest = e.onServerPreRequest.slice() | ||
extensions.onServerPreResponse = e.onServerPreResponse.slice() | ||
extensions.preHandler = e.preHandler.slice() | ||
extensions.onRequest = e.onRequest.slice() | ||
extensions.onSend = e.onSend.slice() | ||
extensions.onResponse = e.onResponse.slice() | ||
extensions.onError = e.onError.slice() | ||
@@ -85,0 +62,0 @@ return extensions |
@@ -31,2 +31,6 @@ 'use strict' | ||
function responseExtIterator(fn, state, next) { | ||
return fn(state, state.reply, next) | ||
} | ||
function serverExtIterator(fn, state, next) { | ||
@@ -36,2 +40,6 @@ return fn(state, state.request, state.reply, next) | ||
function serverOnErrorIterator(fn, state, next) { | ||
return fn(state, state.reply.payload, state.reply.error, next) | ||
} | ||
function clientExtIterator(fn, state, next) { | ||
@@ -41,2 +49,8 @@ return fn(state, next) | ||
module.exports = { extRunner, serverExtIterator, clientExtIterator } | ||
module.exports = { | ||
extRunner, | ||
serverExtIterator, | ||
clientExtIterator, | ||
responseExtIterator, | ||
serverOnErrorIterator | ||
} |
@@ -14,2 +14,3 @@ 'use strict' | ||
const Errors = require('./errors') | ||
const { sAddReceivedMsg } = require('./symbols') | ||
@@ -19,3 +20,3 @@ /** | ||
* - Tracing | ||
* - Check for max recusion error | ||
* - Check for recusion error | ||
* - Build request message | ||
@@ -26,8 +27,8 @@ * | ||
*/ | ||
function onClientPreRequest(context, next) { | ||
let pattern = context._pattern | ||
function onAct(context, next) { | ||
const pattern = context._pattern | ||
let parentContext = context._parentContext | ||
let cleanPattern = context._cleanPattern | ||
let currentTime = Util.nowHrTime() | ||
const parentContext = context._parentContext | ||
const cleanPattern = context._cleanPattern | ||
const currentTime = Util.nowHrTime() | ||
@@ -48,4 +49,3 @@ // shared context | ||
} | ||
context.trace$.parentSpanId = | ||
pattern.trace$.parentSpanId || parentContext.trace$.spanId | ||
context.trace$.parentSpanId = pattern.trace$.parentSpanId || parentContext.trace$.spanId | ||
} else { | ||
@@ -67,3 +67,3 @@ context.trace$ = { | ||
if (context.meta$ && context.meta$.referrers) { | ||
var count = context.meta$.referrers[callSignature] | ||
let count = context.meta$.referrers[callSignature] | ||
count += 1 | ||
@@ -118,3 +118,3 @@ context.meta$.referrers[callSignature] = count | ||
// build msg | ||
let message = { | ||
const message = { | ||
pattern: cleanPattern, | ||
@@ -129,4 +129,2 @@ meta: context.meta$, | ||
context.emit('clientPreRequest', context) | ||
next() | ||
@@ -142,4 +140,4 @@ } | ||
*/ | ||
function onClientPostRequest(context, next) { | ||
let msg = context.response.payload | ||
function onActFinished(context, next) { | ||
const msg = context.response.payload | ||
@@ -169,3 +167,3 @@ // pass to act context | ||
pattern: context.trace$.method, | ||
responseTime: context.trace$.duration / 1e6 | ||
responseTime: context.trace$.duration | ||
}, | ||
@@ -179,3 +177,3 @@ 'Request completed' | ||
pattern: context.trace$.method, | ||
responseTime: context.trace$.duration / 1e6 | ||
responseTime: context.trace$.duration | ||
}, | ||
@@ -186,4 +184,2 @@ 'Request completed' | ||
context.emit('clientPostRequest', context) | ||
next() | ||
@@ -202,4 +198,4 @@ } | ||
*/ | ||
function onServerPreRequest(context, req, reply, next) { | ||
let m = context._serverDecoder(context.request.payload) | ||
function onRequest(context, req, reply, next) { | ||
const m = context._serverDecoder(context.request.payload) | ||
@@ -211,3 +207,3 @@ if (m.error) { | ||
let msg = m.value | ||
const msg = m.value | ||
@@ -227,4 +223,6 @@ if (msg) { | ||
context._pattern = context.request.payload.pattern | ||
// find matched action | ||
context.matchedAction = context._router.lookup(context._pattern) | ||
// We have to remove the pattern manually when maxMessages$ was received. | ||
@@ -234,7 +232,4 @@ // This is required because NATS unsubscribe events is fired too early. | ||
if (context.matchedAction !== null) { | ||
context.matchedAction._receivedMsg++ | ||
if ( | ||
context.matchedAction._receivedMsg === | ||
context.matchedAction.transport.maxMessages | ||
) { | ||
context.matchedAction[sAddReceivedMsg]++ | ||
if (context.matchedAction[sAddReceivedMsg] === context.matchedAction.transport.maxMessages) { | ||
// we only need to remove the pattern because the subscription is unsubscribed by nats driver automatically | ||
@@ -247,3 +242,2 @@ context.cleanTopic(context._topic) | ||
} | ||
context.emit('serverPreRequest', context) | ||
@@ -277,3 +271,3 @@ if (context._config.traceLog === true) { | ||
function onServerPreResponse(context, req, reply, next) { | ||
function onSend(context, req, reply, next) { | ||
if (context._config.traceLog === true) { | ||
@@ -307,3 +301,2 @@ context.log = context.log.child({ | ||
* Only validate when: | ||
* - no error was set before | ||
* - pattern could be resolved | ||
@@ -316,5 +309,5 @@ * - schemaCompiler was found | ||
*/ | ||
function onServerPreRequestSchemaValidation(context, req, reply, next) { | ||
function onRequestSchemaValidation(context, req, reply, next) { | ||
if (context.matchedAction && context._schemaCompiler) { | ||
const schema = context.matchedAction.schema | ||
const { schema } = context.matchedAction | ||
const ret = context._schemaCompiler(schema)(req.payload.pattern) | ||
@@ -333,5 +326,7 @@ if (ret) { | ||
return | ||
} else if (ret.error) { | ||
} | ||
if (ret.error) { | ||
return next(ret.error) | ||
} else if (ret.value) { | ||
} | ||
if (ret.value) { | ||
req.payload.pattern = ret.value | ||
@@ -357,9 +352,5 @@ return next() | ||
*/ | ||
function onServerPreResponseSchemaValidation(context, req, reply, next) { | ||
if ( | ||
!reply.error && | ||
context.matchedAction && | ||
context._responseSchemaCompiler | ||
) { | ||
const schema = context.matchedAction.schema | ||
function onSendSchemaValidation(context, req, reply, next) { | ||
if (!reply.error && context.matchedAction && context._responseSchemaCompiler) { | ||
const { schema } = context.matchedAction | ||
const ret = context._responseSchemaCompiler(schema)(reply.payload) | ||
@@ -384,7 +375,9 @@ if (ret) { | ||
return | ||
} else if (ret.error) { | ||
} | ||
if (ret.error) { | ||
reply.error = ret.error | ||
next(ret.error) | ||
return | ||
} else if (ret.value) { | ||
} | ||
if (ret.value) { | ||
reply.payload = ret.value | ||
@@ -407,7 +400,8 @@ next() | ||
*/ | ||
function onServerPreRequestLoadTest(context, req, reply, next) { | ||
function onRequestLoadTest(context, req, reply, next) { | ||
if (context._config.load.checkPolicy === true) { | ||
const error = context._loadPolicy.check() | ||
if (error) { | ||
return next(new Errors.ProcessLoadError(error.message, error.data)) | ||
next(new Errors.ProcessLoadError(error.message, error.data)) | ||
return | ||
} | ||
@@ -419,12 +413,5 @@ } | ||
module.exports.onClientPreRequest = [onClientPreRequest] | ||
module.exports.onClientPostRequest = [onClientPostRequest] | ||
module.exports.onServerPreResponse = [ | ||
onServerPreResponseSchemaValidation, | ||
onServerPreResponse | ||
] | ||
module.exports.onServerPreRequest = [ | ||
onServerPreRequest, | ||
onServerPreRequestSchemaValidation, | ||
onServerPreRequestLoadTest | ||
] | ||
module.exports.onAct = [onAct] | ||
module.exports.onActFinished = [onActFinished] | ||
module.exports.onSend = [onSendSchemaValidation, onSend] | ||
module.exports.onRequest = [onRequest, onRequestSchemaValidation, onRequestLoadTest] |
413
lib/index.js
@@ -17,3 +17,2 @@ 'use strict' | ||
const NATS = require('nats') | ||
const EventEmitter = require('events') | ||
const Bloomrun = require('bloomrun') | ||
@@ -27,9 +26,6 @@ const Errio = require('errio') | ||
const Avvio = require('avvio') | ||
const Stream = require('stream').Stream | ||
const { Stream } = require('stream') | ||
const runExt = require('./extensionRunner').extRunner | ||
const serverExtIterator = require('./extensionRunner').serverExtIterator | ||
const clientExtIterator = require('./extensionRunner').clientExtIterator | ||
const Errors = require('./errors') | ||
const Symbols = require('./symbols') | ||
const Util = require('./util') | ||
@@ -48,9 +44,6 @@ const NatsTransport = require('./transport') | ||
const ExtensionManager = require('./extensionManager') | ||
const { sChildren, sRegisteredPlugins } = require('./symbols') | ||
const { serverExtIterator, clientExtIterator } = require('./extensionRunner') | ||
const natsConnCodes = [ | ||
NATS.CONN_ERR, | ||
NATS.SECURE_CONN_REQ, | ||
NATS.NON_SECURE_CONN_REQ, | ||
NATS.CLIENT_CERT_REQ | ||
] | ||
const natsConnCodes = [NATS.CONN_ERR, NATS.SECURE_CONN_REQ, NATS.NON_SECURE_CONN_REQ, NATS.CLIENT_CERT_REQ] | ||
@@ -60,3 +53,3 @@ /** | ||
*/ | ||
class Hemera extends EventEmitter { | ||
class Hemera { | ||
/** | ||
@@ -71,4 +64,2 @@ * Creates an instance of Hemera | ||
constructor(transport, params) { | ||
super() | ||
const config = Joi.validate(params || {}, ConfigScheme) | ||
@@ -118,2 +109,3 @@ if (config.error) { | ||
this._responseSchemaCompiler = null | ||
this._errorHandler = null | ||
this._idGenerator = Util.randomId | ||
@@ -139,18 +131,6 @@ | ||
this._extensionManager = new ExtensionManager() | ||
this._extensionManager.add( | ||
'onClientPreRequest', | ||
DefaultExtensions.onClientPreRequest | ||
) | ||
this._extensionManager.add( | ||
'onClientPostRequest', | ||
DefaultExtensions.onClientPostRequest | ||
) | ||
this._extensionManager.add( | ||
'onServerPreRequest', | ||
DefaultExtensions.onServerPreRequest | ||
) | ||
this._extensionManager.add( | ||
'onServerPreResponse', | ||
DefaultExtensions.onServerPreResponse | ||
) | ||
this._extensionManager.add('onAct', DefaultExtensions.onAct) | ||
this._extensionManager.add('onActFinished', DefaultExtensions.onActFinished) | ||
this._extensionManager.add('onRequest', DefaultExtensions.onRequest) | ||
this._extensionManager.add('onSend', DefaultExtensions.onSend) | ||
@@ -170,6 +150,6 @@ this._configureLogger() | ||
this[Symbols.childrenKey] = [] | ||
this[Symbols.registeredPlugins] = [] | ||
this[sChildren] = [] | ||
this[sRegisteredPlugins] = [] | ||
this._avvio.override = (hemera, plugin, opts) => { | ||
this._avvio.override = (hemera, plugin) => { | ||
const pluginMeta = this.getPluginMeta(plugin) | ||
@@ -179,3 +159,3 @@ | ||
if (pluginMeta.name) { | ||
hemera[Symbols.registeredPlugins].push(pluginMeta.name) | ||
hemera[sRegisteredPlugins].push(pluginMeta.name) | ||
} | ||
@@ -192,4 +172,4 @@ hemera.checkPluginDependencies(plugin) | ||
hemera[Symbols.childrenKey].push(instance) | ||
instance[Symbols.childrenKey] = [] | ||
hemera[sChildren].push(instance) | ||
instance[sChildren] = [] | ||
@@ -200,13 +180,9 @@ if (pluginMeta && pluginMeta.name && hemera._config.childLogger) { | ||
instance[Symbols.registeredPlugins] = Object.create( | ||
hemera[Symbols.registeredPlugins] | ||
) | ||
instance[sRegisteredPlugins] = Object.create(hemera[sRegisteredPlugins]) | ||
// inherit all extensions | ||
instance._extensionManager = ExtensionManager.build( | ||
hemera._extensionManager | ||
) | ||
instance._extensionManager = ExtensionManager.build(hemera._extensionManager) | ||
// decorate root instance. All instances will have access | ||
instance.decorate = function() { | ||
instance.decorate = function decorate() { | ||
hemera.decorate.apply(this._root, arguments) | ||
@@ -246,6 +222,7 @@ return instance | ||
_registerErrors() { | ||
for (let error in Hemera.errors) { | ||
for (const error in Hemera.errors) { | ||
Errio.register(Hemera.errors[error]) | ||
} | ||
} | ||
/** | ||
@@ -263,2 +240,3 @@ * | ||
} | ||
/** | ||
@@ -289,2 +267,3 @@ * | ||
} | ||
/** | ||
@@ -432,3 +411,3 @@ * | ||
/** | ||
* Create a custom super error object without to start hemera | ||
* Create a custom super error object | ||
* | ||
@@ -449,3 +428,19 @@ * @readonly | ||
} | ||
/** | ||
* Create a custom super error object | ||
* | ||
* @param {any} name | ||
* @returns | ||
* | ||
* @memberOf Hemera | ||
*/ | ||
createError(name) { | ||
const ctor = SuperError.subclass(name) | ||
// Register the class with Errio | ||
Errio.register(ctor) | ||
return ctor | ||
} | ||
/** | ||
* Add an onAdd handler | ||
@@ -504,5 +499,3 @@ * | ||
if (typeof fn !== 'function') { | ||
throw new Errors.HemeraError( | ||
'ResponseSchemaCompiler handler must be a function' | ||
) | ||
throw new Errors.HemeraError('ResponseSchemaCompiler handler must be a function') | ||
} | ||
@@ -514,2 +507,11 @@ this._responseSchemaCompiler = fn | ||
setErrorHandler(fn) { | ||
if (typeof fn !== 'function') { | ||
throw new Errors.HemeraError('ErrorHandler handler must be a function') | ||
} | ||
this._errorHandler = fn | ||
return this | ||
} | ||
/** | ||
@@ -521,2 +523,3 @@ * Exit the process | ||
fatal() { | ||
// eslint-disable-next-line no-process-exit | ||
this.close(() => process.exit(1)) | ||
@@ -614,3 +617,3 @@ } | ||
} | ||
const dependencies = pluginMeta.dependencies | ||
const { dependencies } = pluginMeta | ||
if (!dependencies) { | ||
@@ -620,11 +623,7 @@ return | ||
if (!Array.isArray(dependencies)) { | ||
throw new Errors.HemeraError( | ||
'Plugin dependencies must be an array of strings' | ||
) | ||
throw new Errors.HemeraError('Plugin dependencies must be an array of strings') | ||
} | ||
dependencies.forEach(dependency => { | ||
if (this[Symbols.registeredPlugins].indexOf(dependency) === -1) { | ||
throw new Errors.HemeraError( | ||
`The dependency '${dependency}' is not registered` | ||
) | ||
if (this[sRegisteredPlugins].indexOf(dependency) === -1) { | ||
throw new Errors.HemeraError(`The dependency '${dependency}' is not registered`) | ||
} | ||
@@ -645,3 +644,3 @@ }) | ||
} | ||
const decorators = pluginMeta.decorators | ||
const { decorators } = pluginMeta | ||
if (!decorators) { | ||
@@ -651,11 +650,7 @@ return | ||
if (!Array.isArray(decorators)) { | ||
throw new Errors.HemeraError( | ||
'Plugin decorators must be an array of strings' | ||
) | ||
throw new Errors.HemeraError('Plugin decorators must be an array of strings') | ||
} | ||
for (let i = 0; i < decorators.length; i++) { | ||
if (!(decorators[i] in this)) { | ||
throw new Errors.HemeraError( | ||
`The decorator dependency '${decorators[i]}' is not registered` | ||
) | ||
throw new Errors.HemeraError(`The decorator dependency '${decorators[i]}' is not registered`) | ||
} | ||
@@ -681,19 +676,4 @@ } | ||
/** | ||
* Create a custom super error object in a running hemera instance | ||
* | ||
* @param {any} name | ||
* @returns | ||
* | ||
* @memberOf Hemera | ||
*/ | ||
createError(name) { | ||
const ctor = SuperError.subclass(name) | ||
// Register the class with Errio | ||
Errio.register(ctor) | ||
return ctor | ||
} | ||
/** | ||
* | ||
* | ||
* @param {Function} cb | ||
@@ -741,7 +721,3 @@ * | ||
this._transport.driver.on('close', () => { | ||
const error = new Errors.HemeraError('NATS connection closed!') | ||
this.log.error(error) | ||
// when an 'error' handler was registered no error is thrown | ||
// but you have to handle it by yourself | ||
this.emit('error', error) | ||
this.log.error(new Errors.HemeraError('NATS connection closed!')) | ||
}) | ||
@@ -826,6 +802,6 @@ | ||
const topic = addDefinition.transport.topic | ||
const maxMessages = addDefinition.transport.maxMessages | ||
const { topic } = addDefinition.transport | ||
const { maxMessages } = addDefinition.transport | ||
const queue = addDefinition.transport.queue || `queue.${topic}` | ||
const pubsub = addDefinition.transport.pubsub | ||
const { pubsub } = addDefinition.transport | ||
@@ -835,17 +811,14 @@ // avoid duplicate subscribers of the emit stream | ||
if (self._topics.has(topic)) { | ||
self.log.debug(`Topic '${topic}' was already subscribed!`) | ||
return 0 | ||
} | ||
let handler = (request, replyTo) => { | ||
const handler = (request, replyTo) => { | ||
// create new execution context | ||
let hemera = self.createContext() | ||
// this will also encapsulate a topic to the plugin | ||
const hemera = self.createContext() | ||
hemera._topic = topic | ||
hemera.request = new ServerRequest(request) | ||
hemera.response = new ServerResponse(replyTo) | ||
hemera.reply = new Reply( | ||
hemera.request, | ||
hemera.response, | ||
hemera, | ||
hemera.log | ||
) | ||
hemera.reply = new Reply(hemera.request, hemera.response, hemera, hemera.log) | ||
hemera._pattern = null | ||
@@ -857,8 +830,10 @@ hemera._isServer = true | ||
runExt( | ||
hemera._extensionManager.onServerPreRequest, | ||
serverExtIterator, | ||
hemera, | ||
err => hemera._onServerPreRequestCompleted(err) | ||
) | ||
if (hemera._extensionManager.onRequest.length) { | ||
runExt(hemera._extensionManager.onRequest, serverExtIterator, hemera, err => | ||
hemera._onRequestCompleted(err) | ||
) | ||
return | ||
} | ||
hemera._onRequestCompleted() | ||
} | ||
@@ -876,3 +851,3 @@ | ||
} else { | ||
// queue group names allow load balancing of services | ||
// queue group names allow load balancing (random) of services | ||
return self._transport.subscribe( | ||
@@ -895,12 +870,10 @@ topic, | ||
*/ | ||
_onServerPreRequestCompleted(extensionError) { | ||
_onRequestCompleted(extensionError) { | ||
const self = this | ||
if (extensionError) { | ||
const internalError = new Errors.HemeraError( | ||
'onServerPreRequest extension', | ||
self.errorDetails | ||
).causedBy(extensionError) | ||
const internalError = new Errors.HemeraError('onRequest extension', self.errorDetails).causedBy( | ||
extensionError | ||
) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', extensionError) | ||
self.reply.isError = true | ||
@@ -913,20 +886,12 @@ self.reply.send(extensionError) | ||
if (self.matchedAction) { | ||
self.emit('serverPreHandler', self) | ||
if (self._extensionManager.onServerPreHandler.length) { | ||
runExt( | ||
self._extensionManager.onServerPreHandler, | ||
serverExtIterator, | ||
self, | ||
err => self._onServerPreHandlerCompleted(err) | ||
if (self._extensionManager.preHandler.length) { | ||
runExt(self._extensionManager.preHandler, serverExtIterator, self, err => | ||
self._preHandlerCompleted(err) | ||
) | ||
} else { | ||
self._onServerPreHandlerCompleted() | ||
self._preHandlerCompleted() | ||
} | ||
} else { | ||
const internalError = new Errors.PatternNotFound( | ||
'No action found for this pattern', | ||
self.errorDetails | ||
) | ||
const internalError = new Errors.PatternNotFound('No action found for this pattern', self.errorDetails) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', internalError) | ||
self.reply.isError = true | ||
@@ -943,12 +908,10 @@ self.reply.send(internalError) | ||
*/ | ||
_onServerPreHandlerCompleted(extensionError) { | ||
_preHandlerCompleted(extensionError) { | ||
const self = this | ||
if (extensionError) { | ||
const internalError = new Errors.HemeraError( | ||
'onServerPreHandler extension', | ||
self.errorDetails | ||
).causedBy(extensionError) | ||
const internalError = new Errors.HemeraError('preHandler extension', self.errorDetails).causedBy( | ||
extensionError | ||
) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', extensionError) | ||
self.reply.isError = true | ||
@@ -960,5 +923,3 @@ self.reply.send(extensionError) | ||
// action middleware | ||
self.matchedAction.run(self.request, self.reply, err => | ||
self._afterMiddlewareHandler(err) | ||
) | ||
self.matchedAction.run(self.request, self.reply, err => self._afterMiddlewareHandler(err)) | ||
} | ||
@@ -976,8 +937,4 @@ | ||
if (err) { | ||
const internalError = new Errors.HemeraError( | ||
'Action middleware', | ||
self.errorDetails | ||
).causedBy(err) | ||
const internalError = new Errors.HemeraError('Action middleware', self.errorDetails).causedBy(err) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', err) | ||
self.reply.isError = true | ||
@@ -990,5 +947,3 @@ self.reply.send(err) | ||
const isPromise = result && typeof result.then === 'function' | ||
if (isPromise) { | ||
// avoid to create a seperate promise | ||
if (result && typeof result.then === 'function') { | ||
// eslint-disable-next-line promise/catch-or-return | ||
@@ -1013,3 +968,3 @@ result.then( | ||
const self = this | ||
let action = self.matchedAction.action.bind(self) | ||
const action = self.matchedAction.action.bind(self) | ||
@@ -1046,3 +1001,3 @@ // if request type is 'pubsub' we don't have to reply back | ||
if (!topic) { | ||
let error = new Errors.HemeraError('The sid or topic name is required') | ||
const error = new Errors.HemeraError('The sid or topic name is required') | ||
self.log.error(error) | ||
@@ -1052,5 +1007,11 @@ throw error | ||
// when sid was passed | ||
if (typeof topic !== 'string' && typeof topic !== 'number') { | ||
const error = new Errors.HemeraError( | ||
`Topic must be from type string or number but got '${typeof topic}'` | ||
) | ||
self.log.error(error) | ||
throw error | ||
} | ||
if (typeof topic === 'string') { | ||
// when topic name was passed | ||
const subId = self._topics.get(topic) | ||
@@ -1060,2 +1021,3 @@ | ||
self._transport.unsubscribe(subId, maxMessages) | ||
self.log.debug(`Topic '${topic}' was unsubscribed!`) | ||
// we remove all subscription related to this topic | ||
@@ -1067,3 +1029,2 @@ this.cleanTopic(topic) | ||
self._transport.unsubscribe(topic, maxMessages) | ||
return true | ||
} | ||
@@ -1085,3 +1046,4 @@ | ||
this.list().forEach(add => { | ||
if (add.pattern.topic === topic) { | ||
// stringify to handle regular expressions | ||
if (add.transport.topic === topic) { | ||
this.router.remove(add.pattern) | ||
@@ -1102,5 +1064,3 @@ } | ||
if (!pattern) { | ||
let error = new Errors.HemeraError( | ||
'Pattern is required to define a server action' | ||
) | ||
const error = new Errors.HemeraError('Pattern is required to define a server action') | ||
this.log.error(error) | ||
@@ -1117,3 +1077,3 @@ throw error | ||
if (!pattern.topic) { | ||
let error = new Errors.HemeraError( | ||
const error = new Errors.HemeraError( | ||
'Topic is required and must be from type string', | ||
@@ -1126,7 +1086,7 @@ this.errorDetails | ||
let schema = Util.extractSchema(pattern) | ||
let patternOnly = Util.cleanPattern(pattern) | ||
const schema = Util.extractSchema(pattern) | ||
const patternOnly = Util.cleanPattern(pattern) | ||
let addDefinition = new Add({ | ||
schema: schema, | ||
const addDefinition = new Add({ | ||
schema, | ||
pattern: patternOnly, | ||
@@ -1150,7 +1110,7 @@ transport: { | ||
let handler = this._router.lookup(addDefinition.pattern) | ||
const handler = this._router.lookup(addDefinition.pattern) | ||
// check if pattern is already registered | ||
if (this._config.bloomrun.lookupBeforeAdd && handler) { | ||
let error = new Errors.HemeraError('Pattern is already in use', { | ||
const error = new Errors.HemeraError('Pattern is already in use', { | ||
pattern: addDefinition.pattern | ||
@@ -1170,3 +1130,3 @@ }) | ||
// it's not possible to susbcribe to the same topic with different transport options | ||
// because we use one subscription for the topic | ||
// because we use one NATS subscription for the topic | ||
const def = this._checkForTransportCollision(addDefinition) | ||
@@ -1178,5 +1138,3 @@ if (def) { | ||
) | ||
throw new Errors.HemeraError( | ||
'Topic is already registered with special transport options' | ||
) | ||
throw new Errors.HemeraError('Topic is already registered with special transport options') | ||
} | ||
@@ -1193,3 +1151,4 @@ | ||
addDefinition.sid = sid | ||
this._topics.set(addDefinition.pattern.topic, sid) | ||
// stringify to handle regular expressions | ||
this._topics.set(addDefinition.transport.topic, sid) | ||
} | ||
@@ -1201,2 +1160,3 @@ | ||
} | ||
/** | ||
@@ -1220,2 +1180,3 @@ * Run all onAdd handlers in serie | ||
} | ||
/** | ||
@@ -1234,8 +1195,4 @@ * Check if a topic was already registered with different transport | ||
// different transport options | ||
if (addDefinition.pattern.topic === def.pattern.topic) { | ||
if ( | ||
mT1.maxMessages !== mT2.maxMessages || | ||
mT1.queue !== mT2.queue || | ||
mT1.pubsub !== mT2.pubsub | ||
) { | ||
if (addDefinition.transport.topic === def.transport.topic) { | ||
if (mT1.maxMessages !== mT2.maxMessages || mT1.queue !== mT2.queue || mT1.pubsub !== mT2.pubsub) { | ||
return def | ||
@@ -1269,8 +1226,6 @@ } | ||
if (self.response.error) { | ||
let internalError = new Errors.ParseError( | ||
'Client payload decoding', | ||
self.errorDetails | ||
).causedBy(self.response.error) | ||
const internalError = new Errors.ParseError('Client payload decoding', self.errorDetails).causedBy( | ||
self.response.error | ||
) | ||
self.log.error(internalError) | ||
self.emit('clientResponseError', self.response.error) | ||
self._execute(self.response.error) | ||
@@ -1280,8 +1235,10 @@ return | ||
runExt( | ||
self._extensionManager.onClientPostRequest, | ||
clientExtIterator, | ||
self, | ||
err => self._onClientPostRequestCompleted(err) | ||
) | ||
if (self._extensionManager.onActFinished.length) { | ||
runExt(self._extensionManager.onActFinished, clientExtIterator, self, err => | ||
self._onActFinishedCallback(err) | ||
) | ||
return | ||
} | ||
self._onActFinishedCallback() | ||
} | ||
@@ -1295,13 +1252,11 @@ | ||
*/ | ||
_onClientPostRequestCompleted(extensionError) { | ||
_onActFinishedCallback(extensionError) { | ||
const self = this | ||
if (extensionError) { | ||
let error = self.getRootError(extensionError) | ||
const internalError = new Errors.HemeraError( | ||
'onClientPostRequest extension', | ||
self.errorDetails | ||
).causedBy(extensionError) | ||
const error = self.getRootError(extensionError) | ||
const internalError = new Errors.HemeraError('onActFinished extension', self.errorDetails).causedBy( | ||
extensionError | ||
) | ||
self.log.error(internalError) | ||
self.emit('clientResponseError', extensionError) | ||
self._execute(error) | ||
@@ -1312,4 +1267,3 @@ return | ||
if (self.response.payload.error) { | ||
let error = Errio.fromObject(self.response.payload.error) | ||
self.emit('clientResponseError', error) | ||
const error = Errio.fromObject(self.response.payload.error) | ||
self._execute(error) | ||
@@ -1333,5 +1287,3 @@ return | ||
if (!pattern) { | ||
let error = new Errors.HemeraError( | ||
'Pattern is required to start a request' | ||
) | ||
const error = new Errors.HemeraError('Pattern is required to start a request') | ||
this.log.error(error) | ||
@@ -1348,3 +1300,3 @@ throw error | ||
if (!pattern.topic) { | ||
let error = new Errors.HemeraError( | ||
const error = new Errors.HemeraError( | ||
'Topic is required and must be from type string', | ||
@@ -1358,3 +1310,3 @@ this.errorDetails | ||
// create new execution context | ||
let hemera = this.createContext() | ||
const hemera = this.createContext() | ||
hemera._pattern = pattern | ||
@@ -1372,8 +1324,7 @@ hemera._parentContext = this | ||
hemera._execute = cb.bind(hemera) | ||
runExt( | ||
hemera._extensionManager.onClientPreRequest, | ||
clientExtIterator, | ||
hemera, | ||
err => hemera._onClientPreRequestCompleted(err) | ||
) | ||
if (hemera._extensionManager.onAct.length) { | ||
runExt(hemera._extensionManager.onAct, clientExtIterator, hemera, err => hemera._onActCallback(err)) | ||
return | ||
} | ||
hemera._onActCallback() | ||
} else { | ||
@@ -1390,8 +1341,7 @@ const evaluateResult = new Promise((resolve, reject) => { | ||
runExt( | ||
hemera._extensionManager.onClientPreRequest, | ||
clientExtIterator, | ||
hemera, | ||
err => hemera._onClientPreRequestCompleted(err) | ||
) | ||
if (hemera._extensionManager.onAct.length) { | ||
runExt(hemera._extensionManager.onAct, clientExtIterator, hemera, err => hemera._onActCallback(err)) | ||
} else { | ||
hemera._onActCallback() | ||
} | ||
@@ -1430,5 +1380,5 @@ return evaluateResult.then(resp => { | ||
*/ | ||
_onClientPreRequestCompleted(err) { | ||
_onActCallback(err) { | ||
const self = this | ||
let m = self._clientEncoder(self._message) | ||
const m = self._clientEncoder(self._message) | ||
@@ -1441,7 +1391,4 @@ self.request.payload = m.value | ||
self.request.error = m.error | ||
let error = new Errors.ParseError('Client payload encoding').causedBy( | ||
m.error | ||
) | ||
const error = new Errors.ParseError('Client payload encoding').causedBy(m.error) | ||
self.log.error(error) | ||
self.emit('clientResponseError', m.error) | ||
self._execute(m.error) | ||
@@ -1452,10 +1399,7 @@ return | ||
if (err) { | ||
let error = self.getRootError(err) | ||
const error = self.getRootError(err) | ||
self.request.payload = null | ||
self.request.error = error | ||
const internalError = new Errors.HemeraError( | ||
'onClientPreRequest extension' | ||
).causedBy(err) | ||
const internalError = new Errors.HemeraError('onAct extension').causedBy(err) | ||
self.log.error(internalError) | ||
self.emit('clientResponseError', error) | ||
self._execute(error) | ||
@@ -1467,5 +1411,3 @@ return | ||
if (self._pattern.pubsub$ === true) { | ||
self._transport.send(self._pattern.topic, self.request.payload, err => | ||
self._execute(err) | ||
) | ||
self._transport.send(self._pattern.topic, self.request.payload, err => self._execute(err)) | ||
} else { | ||
@@ -1480,17 +1422,10 @@ const optOptions = { | ||
// support maxMessages$ -1 | ||
if ( | ||
typeof self._pattern.maxMessages$ === 'number' || | ||
self._pattern.expectedMessages$ > 0 | ||
) { | ||
if (typeof self._pattern.maxMessages$ === 'number' || self._pattern.expectedMessages$ > 0) { | ||
// we can't receive more messages than "expected" messages | ||
// the inbox is closed automatically | ||
optOptions.max = | ||
self._pattern.expectedMessages$ || self._pattern.maxMessages$ | ||
optOptions.max = self._pattern.expectedMessages$ || self._pattern.maxMessages$ | ||
} | ||
// send request | ||
self.sid = self._transport.sendRequest( | ||
self._pattern.topic, | ||
self.request.payload, | ||
optOptions, | ||
resp => self._sendRequestHandler(resp) | ||
self.sid = self._transport.sendRequest(self._pattern.topic, self.request.payload, optOptions, resp => | ||
self._sendRequestHandler(resp) | ||
) | ||
@@ -1505,2 +1440,3 @@ | ||
} | ||
/** | ||
@@ -1535,10 +1471,10 @@ * Handle the timeout when a pattern could not be resolved. Can have different reasons: | ||
self.response.error = error | ||
self.emit('clientResponseError', error) | ||
runExt( | ||
self._extensionManager.onClientPostRequest, | ||
clientExtIterator, | ||
self, | ||
err => self._onClientTimeoutPostRequestCompleted(err) | ||
) | ||
if (self._extensionManager.onActFinished.length) { | ||
runExt(self._extensionManager.onActFinished, clientExtIterator, self, err => | ||
self._onActTimeoutCallback(err) | ||
) | ||
return | ||
} | ||
self._onActTimeoutCallback() | ||
} | ||
@@ -1552,13 +1488,10 @@ | ||
*/ | ||
_onClientTimeoutPostRequestCompleted(err) { | ||
_onActTimeoutCallback(err) { | ||
const self = this | ||
if (err) { | ||
let error = self.getRootError(err) | ||
const internalError = new Errors.HemeraError( | ||
'onClientPostRequest extension' | ||
).causedBy(err) | ||
const error = self.getRootError(err) | ||
const internalError = new Errors.HemeraError('onActFinished extension').causedBy(err) | ||
self.log.error(internalError) | ||
self.response.error = error | ||
self.emit('clientResponseError', error) | ||
} | ||
@@ -1600,3 +1533,3 @@ | ||
removeAll() { | ||
for (var topic of this._topics.keys()) { | ||
for (const topic of this._topics.keys()) { | ||
this.remove(topic) | ||
@@ -1603,0 +1536,0 @@ } |
267
lib/reply.js
@@ -12,197 +12,184 @@ 'use strict' | ||
const Errio = require('errio') | ||
const Errors = require('./errors') | ||
const Errio = require('errio') | ||
const runExt = require('./extensionRunner').extRunner | ||
const serverExtIterator = require('./extensionRunner').serverExtIterator | ||
const { responseExtIterator, serverExtIterator, serverOnErrorIterator } = require('./extensionRunner') | ||
const { sReplySent, sReplyRequest, sReplyResponse, sReplyHemera } = require('./symbols') | ||
/** | ||
* @TODO rename hook to onServerSend | ||
* | ||
* @class Reply | ||
*/ | ||
class Reply { | ||
/** | ||
* Creates an instance of Reply. | ||
* @param {any} request | ||
* @param {any} response | ||
* @param {any} hemera | ||
* @param {any} logger | ||
* | ||
* @memberof Reply | ||
*/ | ||
constructor(request, response, hemera, logger) { | ||
this._request = request | ||
this._response = response | ||
this.hemera = hemera | ||
this[sReplyRequest] = request | ||
this[sReplyResponse] = response | ||
this[sReplyHemera] = hemera | ||
this.log = logger | ||
this.sent = false | ||
this[sReplySent] = false | ||
this.isError = false | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} payload | ||
* | ||
* @memberof Reply | ||
*/ | ||
set payload(value) { | ||
this._response.payload = value | ||
this[sReplyResponse].payload = value | ||
} | ||
/** | ||
* | ||
* | ||
* @readonly | ||
* | ||
* @memberof Reply | ||
*/ | ||
get payload() { | ||
return this._response.payload | ||
return this[sReplyResponse].payload | ||
} | ||
/** | ||
* Set the response error | ||
* Error can not be set twice | ||
* | ||
* @memberof Reply | ||
*/ | ||
set error(value) { | ||
this._response.error = this.hemera._attachHops( | ||
this.hemera.getRootError(value) | ||
) | ||
this[sReplyResponse].error = this[sReplyHemera]._attachHops(this[sReplyHemera].getRootError(value)) | ||
} | ||
/** | ||
* | ||
* | ||
* @readonly | ||
* | ||
* @memberof Reply | ||
*/ | ||
get error() { | ||
return this._response.error | ||
return this[sReplyResponse].error | ||
} | ||
/** | ||
* | ||
*/ | ||
next(msg) { | ||
this.sent = false | ||
this[sReplySent] = false | ||
this.send(msg) | ||
} | ||
/** | ||
* Set the response payload or error | ||
* | ||
* @param {any} msg | ||
* @memberof Reply | ||
*/ | ||
send(msg) { | ||
const self = this | ||
if (self.sent) { | ||
self.log.warn(new Errors.HemeraError('Reply already sent')) | ||
if (this[sReplySent] === true) { | ||
this.log.warn(new Errors.HemeraError('Reply already sent')) | ||
return | ||
} | ||
if (!(msg instanceof Error) && self.isError === true) { | ||
const internalError = new Errors.HemeraError( | ||
`Response error must be derivated from type 'Error' but got '${typeof msg}'` | ||
) | ||
self.log.error(internalError) | ||
return | ||
} | ||
this[sReplySent] = true | ||
self.sent = true | ||
const isNativeError = msg instanceof Error | ||
// 0, null, '' can be send | ||
if (msg !== undefined) { | ||
if (msg instanceof Error) { | ||
self.error = msg | ||
self.payload = null | ||
if (isNativeError) { | ||
this.error = msg | ||
this.payload = null | ||
} else { | ||
self.error = null | ||
self.payload = msg | ||
this.error = null | ||
this.payload = msg | ||
} | ||
} | ||
self.serverPreResponse() | ||
} | ||
/** | ||
* | ||
*/ | ||
serverPreResponse() { | ||
const self = this | ||
if (this.isError === true || isNativeError) { | ||
this._handleError(msg, () => this._onErrorHook()) | ||
return | ||
} | ||
self.hemera.emit('serverPreResponse', self.hemera) | ||
runExt( | ||
self.hemera._extensionManager.onServerPreResponse, | ||
serverExtIterator, | ||
self.hemera, | ||
err => self._onServerPreResponseCompleted(err) | ||
) | ||
this._sendHook() | ||
} | ||
/** | ||
* | ||
* @param {*} extensionError | ||
*/ | ||
_onServerPreResponseCompleted(extensionError) { | ||
const self = this | ||
if (extensionError) { | ||
_handleError(err, cb) { | ||
if (!(err instanceof Error)) { | ||
const internalError = new Errors.HemeraError( | ||
'onServerPreResponse extension', | ||
self.hemera.errorDetails | ||
).causedBy(extensionError) | ||
self.log.error(internalError) | ||
self.hemera.emit('serverResponseError', extensionError) | ||
// don't use send() here in order to avoid rexecution of serverPreResponse | ||
// and to send the "send-error" as final response | ||
self.error = extensionError | ||
`Response error must be derivated from type 'Error' but got '${typeof err}'` | ||
) | ||
this.log.error(internalError) | ||
return | ||
} | ||
if (self._response.replyTo) { | ||
const msg = self.build( | ||
self.hemera.meta$, | ||
self.hemera.trace$, | ||
self.hemera.request$ | ||
) | ||
if (this[sReplyHemera]._errorHandler) { | ||
const result = this[sReplyHemera]._errorHandler(this[sReplyHemera], err, this.reply) | ||
if (result && typeof result.then === 'function') { | ||
result | ||
.then(() => { | ||
if (cb) { | ||
cb() | ||
} | ||
}) | ||
.catch(err => this._logError(err, 'error handler')) | ||
return | ||
} | ||
} | ||
self.hemera._transport.send(self._response.replyTo, msg.value) | ||
if (cb) { | ||
cb() | ||
} | ||
} | ||
/** | ||
* | ||
* @param {*} meta$ | ||
* @param {*} trace$ | ||
* @param {*} request$ | ||
*/ | ||
build(meta$, trace$, request$) { | ||
const self = this | ||
let message = { | ||
meta: meta$ || {}, | ||
trace: trace$ || {}, | ||
request: request$, | ||
result: self.error ? null : self.payload, | ||
error: self.error ? Errio.toObject(self.error) : null | ||
_logError(err, message) { | ||
const internalError = new Errors.HemeraError(message, this[sReplyHemera].errorDetails).causedBy(err) | ||
this.log.error(internalError) | ||
} | ||
_onErrorHook() { | ||
if (this[sReplyHemera]._extensionManager.onError.length) { | ||
runExt(this[sReplyHemera]._extensionManager.onError, serverOnErrorIterator, this[sReplyHemera], err => { | ||
if (err) { | ||
this._logError(err, 'onError extension') | ||
} | ||
this._sendHook() | ||
}) | ||
return | ||
} | ||
let msg = self.hemera._serverEncoder(message) | ||
this._sendHook() | ||
} | ||
_sendHook() { | ||
if (this[sReplyHemera]._extensionManager.onSend.length) { | ||
runExt(this[sReplyHemera]._extensionManager.onSend, serverExtIterator, this[sReplyHemera], err => { | ||
if (err) { | ||
this._logError(err, 'onSend extension') | ||
// first set error has precedence | ||
if (this.error === null) { | ||
this[sReplySent] = false | ||
this.send(err) | ||
return | ||
} | ||
} | ||
this._send() | ||
}) | ||
return | ||
} | ||
this._send() | ||
} | ||
_send() { | ||
const msg = this.build(this[sReplyHemera].meta$, this[sReplyHemera].trace$, this[sReplyHemera].request$) | ||
// don't try to send encoding issues back because | ||
// it could end up in a endloss loop | ||
if (msg.error) { | ||
let internalError = new Errors.ParseError( | ||
'Server payload encoding' | ||
).causedBy(msg.error) | ||
self.log.error(internalError) | ||
self.hemera.emit('serverResponseError', msg.error) | ||
message.error = Errio.toObject(msg.error) | ||
message.result = null | ||
msg = self.hemera._serverEncoder(message) | ||
const internalError = new Errors.ParseError('Server encoding').causedBy(msg.error) | ||
this.log.error(internalError) | ||
this._handleError(msg.error) | ||
return | ||
} | ||
return msg | ||
if (this[sReplyResponse].replyTo) { | ||
this[sReplyHemera]._transport.send(this[sReplyResponse].replyTo, msg.value) | ||
} | ||
this._onResponse() | ||
} | ||
_onResponse() { | ||
if (this[sReplyHemera]._extensionManager.onResponse.length) { | ||
runExt( | ||
this[sReplyHemera]._extensionManager.onResponse, | ||
responseExtIterator, | ||
this[sReplyHemera], | ||
err => { | ||
if (err) { | ||
this._logError(err, 'onResponse extension') | ||
} | ||
} | ||
) | ||
} | ||
} | ||
build(meta$, trace$, request$) { | ||
const message = { | ||
meta: meta$ || {}, | ||
trace: trace$ || {}, | ||
request: request$, | ||
result: this.error ? null : this.payload, | ||
error: this.error ? Errio.toObject(this.error) : null | ||
} | ||
return this[sReplyHemera]._serverEncoder(message) | ||
} | ||
} | ||
module.exports = Reply |
@@ -13,4 +13,10 @@ 'use strict' | ||
module.exports = { | ||
childrenKey: Symbol('hemera.children'), | ||
registeredPlugins: Symbol('hemera.registered-plugin') | ||
sChildren: Symbol('hemera.children'), | ||
sRegisteredPlugins: Symbol('hemera.registered-plugin'), | ||
sReplySent: Symbol('hemera.reply-sent'), | ||
sReplyRequest: Symbol('hemera.reply-request'), | ||
sReplyResponse: Symbol('hemera.reply-response'), | ||
sReplyHemera: Symbol('hemera.reply-hemera'), | ||
sReplyLog: Symbol('hemera.reply-log'), | ||
sAddReceivedMsg: Symbol('hemera.add-receivedMsg') | ||
} |
@@ -12,15 +12,3 @@ 'use strict' | ||
/** | ||
* | ||
* | ||
* @class Transport | ||
*/ | ||
class NatsTransport { | ||
/** | ||
* Creates an instance of NatsTransport. | ||
* | ||
* @param {any} params | ||
* | ||
* @memberOf NatsTransport | ||
*/ | ||
constructor(params) { | ||
@@ -30,9 +18,2 @@ this.nc = params.transport | ||
/** | ||
* | ||
* | ||
* @readonly | ||
* | ||
* @memberOf NatsTransport | ||
*/ | ||
get driver() { | ||
@@ -42,9 +23,2 @@ return this.nc | ||
/** | ||
* | ||
* @param {*} sid | ||
* @param {*} timeout | ||
* @param {*} expected | ||
* @param {*} callback | ||
*/ | ||
timeout(sid, timeout, expected, callback) { | ||
@@ -54,9 +28,2 @@ return this.nc.timeout(sid, timeout, expected, callback) | ||
/** | ||
* | ||
* @param {*} subject | ||
* @param {*} msg | ||
* @param {*} optReply | ||
* @param {*} optCallback | ||
*/ | ||
send(subject, msg, optReply, optCallback) { | ||
@@ -66,9 +33,2 @@ return this.nc.publish(subject, msg, optReply, optCallback) | ||
/** | ||
* | ||
* | ||
* @returns | ||
* | ||
* @memberOf NatsTransport | ||
*/ | ||
close() { | ||
@@ -78,6 +38,2 @@ return this.nc.close() | ||
/** | ||
* | ||
* @param {*} optCallback | ||
*/ | ||
flush(optCallback) { | ||
@@ -87,8 +43,2 @@ return this.nc.flush(optCallback) | ||
/** | ||
* | ||
* @param {*} subject | ||
* @param {*} opts | ||
* @param {*} callback | ||
*/ | ||
subscribe(subject, opts, callback) { | ||
@@ -98,7 +48,2 @@ return this.nc.subscribe(subject, opts, callback) | ||
/** | ||
* | ||
* @param {*} sid | ||
* @param {*} optMax | ||
*/ | ||
unsubscribe(sid, optMax) { | ||
@@ -108,9 +53,2 @@ return this.nc.unsubscribe(sid, optMax) | ||
/** | ||
* | ||
* @param {*} subject | ||
* @param {*} optMsg | ||
* @param {*} optOptions | ||
* @param {*} callback | ||
*/ | ||
sendRequest(subject, optMsg, optOptions, callback) { | ||
@@ -117,0 +55,0 @@ return this.nc.request(subject, optMsg, optOptions, callback) |
133
lib/util.js
'use strict' | ||
const NUID = require('nuid') | ||
/** | ||
@@ -12,12 +14,2 @@ * Copyright 2016-present, Dustin Deus (deusdustin@gmail.com) | ||
const lut = [] | ||
for (let i = 0; i < 256; i++) { | ||
lut[i] = (i < 16 ? '0' : '') + i.toString(16) | ||
} | ||
const NS_PER_SEC = 1e9 | ||
/** | ||
* @class Util | ||
*/ | ||
class Util { | ||
@@ -27,11 +19,3 @@ static escapeTopicForRegExp(string) { | ||
} | ||
/** | ||
* | ||
* | ||
* @static | ||
* @param {any} subject | ||
* @returns | ||
* | ||
* @memberof Util | ||
*/ | ||
static natsWildcardToRegex(subject) { | ||
@@ -42,53 +26,27 @@ if (subject instanceof RegExp) { | ||
let hasTokenWildcard = subject.toString().indexOf('*') > -1 | ||
let hasFullWildcard = subject.toString().indexOf('>') > -1 | ||
const hasTokenWildcard = subject.toString().indexOf('*') > -1 | ||
const hasFullWildcard = subject.toString().indexOf('>') > -1 | ||
let sub = subject | ||
if (hasFullWildcard) { | ||
subject = Util.escapeTopicForRegExp(subject).replace( | ||
'>', | ||
'[a-zA-Z0-9-.]+' | ||
) | ||
return new RegExp('^' + subject + '$', 'i') | ||
const fullWildcard = Util.escapeTopicForRegExp(subject).replace('>', '[a-zA-Z0-9-.]+') | ||
sub = new RegExp(`^${fullWildcard}$`, 'i') | ||
} else if (hasTokenWildcard) { | ||
subject = Util.escapeTopicForRegExp(subject).replace( | ||
/\*/g, | ||
'[a-zA-Z0-9-]+' | ||
) | ||
return new RegExp('^' + subject + '$', 'i') | ||
const tokenWildcard = Util.escapeTopicForRegExp(subject).replace(/\*/g, '[a-zA-Z0-9-]+') | ||
sub = new RegExp(`^${tokenWildcard}$`, 'i') | ||
} | ||
return subject | ||
return sub | ||
} | ||
/** | ||
* @returns | ||
* Fast ID generator: e7 https://jsperf.com/uuid-generator-opt/18 | ||
* @memberOf Util | ||
* Generates a unique random id | ||
* Total length of a NUID string is 22 bytes of base 36 ascii text | ||
*/ | ||
static randomId() { | ||
const d0 = (Math.random() * 0xffffffff) | 0 | ||
const d1 = (Math.random() * 0xffffffff) | 0 | ||
const d2 = (Math.random() * 0xffffffff) | 0 | ||
const d3 = (Math.random() * 0xffffffff) | 0 | ||
return ( | ||
lut[d0 & 0xff] + | ||
lut[(d0 >> 8) & 0xff] + | ||
lut[(d0 >> 16) & 0xff] + | ||
lut[(d0 >> 24) & 0xff] + | ||
lut[d1 & 0xff] + | ||
lut[(d1 >> 8) & 0xff] + | ||
lut[((d1 >> 16) & 0x0f) | 0x40] + | ||
lut[(d1 >> 24) & 0xff] + | ||
lut[(d2 & 0x3f) | 0x80] + | ||
lut[(d2 >> 8) & 0xff] + | ||
lut[(d2 >> 16) & 0xff] + | ||
lut[(d2 >> 24) & 0xff] + | ||
lut[d3 & 0xff] + | ||
lut[(d3 >> 8) & 0xff] + | ||
lut[(d3 >> 16) & 0xff] + | ||
lut[(d3 >> 24) & 0xff] | ||
) | ||
return NUID.next() | ||
} | ||
/** | ||
* Get high resolution time in nanoseconds | ||
* Get high resolution time in miliseconds | ||
* | ||
@@ -101,20 +59,14 @@ * @static | ||
static nowHrTime() { | ||
const hrtime = process.hrtime() | ||
return +hrtime[0] * NS_PER_SEC + +hrtime[1] | ||
const ts = process.hrtime() | ||
return ts[0] * 1e3 + ts[1] / 1e6 | ||
} | ||
/** | ||
* | ||
* | ||
* @static | ||
* @param {any} obj | ||
* @returns | ||
* | ||
* @memberOf Util | ||
*/ | ||
static extractSchema(obj) { | ||
if (obj === null) return obj | ||
if (obj === null) { | ||
return obj | ||
} | ||
const o = {} | ||
for (let key in obj) { | ||
for (const key in obj) { | ||
if (typeof obj[key] === 'object') { | ||
@@ -127,15 +79,11 @@ o[key] = obj[key] | ||
} | ||
/** | ||
* @static | ||
* @param {any} obj | ||
* @returns | ||
* | ||
* @memberOf Util | ||
*/ | ||
static cleanPattern(obj) { | ||
if (obj === null) return obj | ||
if (obj === null) { | ||
return obj | ||
} | ||
const o = {} | ||
for (let key in obj) { | ||
for (const key in obj) { | ||
if ( | ||
@@ -153,15 +101,10 @@ !key.endsWith('$') && | ||
/** | ||
* @static | ||
* @param {any} obj | ||
* @returns | ||
* | ||
* @memberOf Util | ||
*/ | ||
static cleanFromSpecialVars(obj) { | ||
if (obj === null) return obj | ||
if (obj === null) { | ||
return obj | ||
} | ||
const o = {} | ||
for (let key in obj) { | ||
for (const key in obj) { | ||
if (!key.endsWith('$')) { | ||
@@ -174,8 +117,2 @@ o[key] = obj[key] | ||
/** | ||
* @param {any} args | ||
* @returns | ||
* | ||
* @memberOf Util | ||
*/ | ||
static pattern(args) { | ||
@@ -187,6 +124,6 @@ if (typeof args === 'string') { | ||
const obj = Util.cleanPattern(args) | ||
let sb = [] | ||
const sb = [] | ||
for (let key in obj) { | ||
sb.push(key + ':' + obj[key]) | ||
for (const key in obj) { | ||
sb.push(`${key}:${obj[key]}`) | ||
} | ||
@@ -193,0 +130,0 @@ |
{ | ||
"name": "nats-hemera", | ||
"author": "Dustin Deus (https://github.com/StarpTech)", | ||
"version": "6.1.1", | ||
"version": "7.0.0", | ||
"main": "lib/index.js", | ||
@@ -52,3 +52,4 @@ "homepage": "https://hemerajs.github.io/hemera/", | ||
"joi": "11.1.x", | ||
"pino": "5.6.x", | ||
"nuid": "^1.0.0", | ||
"pino": "5.11.x", | ||
"super-error": "2.2.x", | ||
@@ -58,6 +59,6 @@ "tinysonic": "1.3.x" | ||
"peerDependencies": { | ||
"nats": ">= 0.8.x <= 1.x.x" | ||
"nats": ">= 0.8.x <= 1.2.x" | ||
}, | ||
"typings": "./index.d.ts", | ||
"gitHead": "d13b52980cbf947aaa88696f9dc9afeb4e2db6b3" | ||
"gitHead": "ca0731186ddc56c051acf467a1257e11307f528d" | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
79439
11
2852
+ Addednuid@^1.0.0
+ Addedatomic-sleep@1.0.0(transitive)
+ Addednats@1.2.10(transitive)
+ Addedpino@5.11.3(transitive)
+ Addedsonic-boom@0.7.7(transitive)
- Removedend-of-stream@1.4.4(transitive)
- Removedfast-json-parse@1.0.3(transitive)
- Removednats@1.4.12(transitive)
- Removedonce@1.4.0(transitive)
- Removedpino@5.6.4(transitive)
- Removedpump@3.0.2(transitive)
- Removedsonic-boom@0.6.3(transitive)
- Removedwrappy@1.0.2(transitive)
Updatedpino@5.11.x