New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nats-hemera

Package Overview
Dependencies
Maintainers
1
Versions
279
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nats-hemera - npm Package Compare versions

Comparing version 1.6.2 to 2.0.0-0

38

lib/add.js

@@ -13,3 +13,2 @@ 'use strict'

const _ = require('lodash')
const Co = require('co')
const Util = require('./util')

@@ -30,3 +29,3 @@

*/
constructor (actMeta, options) {
constructor(actMeta, options) {
this.actMeta = actMeta

@@ -45,3 +44,3 @@ this.options = options

*/
_use (handler) {
_use(handler) {
this.actMeta.middleware.push(Util.toPromiseFact(handler))

@@ -58,3 +57,3 @@ }

*/
use (handler) {
use(handler) {
if (_.isArray(handler)) {

@@ -75,3 +74,3 @@ handler.forEach(h => this._use(h))

*/
end (cb) {
end(cb) {
this.action = cb

@@ -89,6 +88,10 @@ }

*/
dispatch (request, response, cb) {
Util.serial(this.middleware, (item, next) => {
item(request, response, next)
}, cb)
run(request, response, cb) {
Util.eachSeries(
this.middleware,
(item, next) => {
item(request, response, next)
},
cb
)
}

@@ -103,3 +106,3 @@

*/
get middleware () {
get middleware() {
return this.actMeta.middleware

@@ -114,3 +117,3 @@ }

*/
get schema () {
get schema() {
return this.actMeta.schema

@@ -126,3 +129,3 @@ }

*/
get pattern () {
get pattern() {
return this.actMeta.pattern

@@ -137,7 +140,4 @@ }

*/
set action (action) {
if (Util.isGeneratorFunction(action)) {
this.actMeta.action = Co.wrap(action)
this.isPromisable = true
} else if (Util.isAsyncFunction(action)) {
set action(action) {
if (Util.isAsyncFunction(action)) {
this.actMeta.action = action

@@ -157,3 +157,3 @@ this.isPromisable = true

*/
get action () {
get action() {
return this.actMeta.action

@@ -168,3 +168,3 @@ }

*/
get plugin () {
get plugin() {
return this.actMeta.plugin

@@ -171,0 +171,0 @@ }

@@ -23,3 +23,3 @@ 'use strict'

*/
constructor () {
constructor() {
this._request = {}

@@ -35,3 +35,3 @@ }

*/
get payload () {
get payload() {
return this._request.value

@@ -47,3 +47,3 @@ }

*/
get error () {
get error() {
return this._request.error

@@ -58,3 +58,3 @@ }

*/
set payload (value) {
set payload(value) {
this._request.value = value

@@ -69,3 +69,3 @@ }

*/
set error (error) {
set error(error) {
this._request.error = error

@@ -72,0 +72,0 @@ }

@@ -22,3 +22,3 @@ 'use strict'

*/
constructor () {
constructor() {
this._response = {}

@@ -34,3 +34,3 @@ }

*/
get payload () {
get payload() {
return this._response.value

@@ -45,3 +45,3 @@ }

*/
set payload (value) {
set payload(value) {
this._response.value = value

@@ -56,3 +56,3 @@ }

*/
set error (error) {
set error(error) {
this._response.error = error

@@ -68,3 +68,3 @@ }

*/
get error () {
get error() {
return this._response.error

@@ -71,0 +71,0 @@ }

@@ -22,3 +22,3 @@ 'use strict'

*/
constructor (type) {
constructor(type) {
this._type = type

@@ -34,3 +34,3 @@ this._stack = []

*/
add (step) {
add(step) {
this._stack.push(step)

@@ -47,3 +47,3 @@ return this

*/
reset (step) {
reset(step) {
this._stack = step ? [step] : []

@@ -60,3 +60,3 @@ return this

*/
first (step) {
first(step) {
this._stack.unshift(step)

@@ -71,3 +71,3 @@ return this

*/
run (msg, ctx) {
run(msg, ctx) {
let firstError = null

@@ -74,0 +74,0 @@

@@ -7,5 +7,9 @@ const Joi = require('joi')

// Max execution time of a request
timeout: Joi.number().integer().default(2000),
timeout: Joi.number()
.integer()
.default(2000),
// Max initialization time of a plugin
pluginTimeout: Joi.number().integer().default(3000),
pluginTimeout: Joi.number()
.integer()
.default(3000),
tag: Joi.string().default(''),

@@ -18,59 +22,76 @@ // Enables pretty log formatter in Pino default logger

crashOnFatal: Joi.boolean().default(true),
logLevel: Joi.any().valid(['fatal', 'error', 'warn', 'info', 'debug', 'trace', 'silent']).default('silent'),
logLevel: Joi.any()
.valid(['fatal', 'error', 'warn', 'info', 'debug', 'trace', 'silent'])
.default('silent'),
// Create a child logger per section / plugin. Only possible with default logger Pino.
childLogger: Joi.boolean().default(false),
// Max recursive method calls
maxRecursion: Joi.number().integer().default(0),
maxRecursion: Joi.number()
.integer()
.default(0),
// Custom logger
logger: Joi.object().optional(),
// The error serialization options
errio: Joi.object().keys({
// Recursively serialize and deserialize nested errors
recursive: Joi.boolean().default(true),
// Include inherited properties
inherited: Joi.boolean().default(true),
// Include stack property
stack: Joi.boolean().default(true),
// Include properties with leading or trailing underscores
private: Joi.boolean().default(false),
// Property names to exclude (low priority)
exclude: Joi.array().items(Joi.string()).default([]),
// Property names to include (high priority)
include: Joi.array().items(Joi.string()).default([])
}).default(),
errio: Joi.object()
.keys({
// Recursively serialize and deserialize nested errors
recursive: Joi.boolean().default(true),
// Include inherited properties
inherited: Joi.boolean().default(true),
// Include stack property
stack: Joi.boolean().default(true),
// Include properties with leading or trailing underscores
private: Joi.boolean().default(false),
// Property names to exclude (low priority)
exclude: Joi.array()
.items(Joi.string())
.default([]),
// Property names to include (high priority)
include: Joi.array()
.items(Joi.string())
.default([])
})
.default(),
// Pattern matching options
bloomrun: Joi.object().keys({
indexing: Joi.any().valid(['insertion', 'depth']).default('insertion'),
// Checks if the pattern is no duplicate based on to the indexing strategy
lookupBeforeAdd: Joi.boolean().default(false)
}).default(),
load: Joi.object().keys({
// Check on every request (server) if the load policy was observed,
checkPolicy: Joi.boolean().default(true),
// Should gracefully exit the process to recover from memory leaks or load, crashOnFatal must be enabled
shouldCrash: Joi.boolean().default(true),
process: Joi.object().keys({
// Frequency of load sampling in milliseconds (zero is no sampling)
sampleInterval: Joi.number().integer().default(0)
}).default(),
policy: Joi.object().keys({
// Reject requests when V8 heap is over size in bytes (zero is no max)
maxHeapUsedBytes: Joi.number().integer().default(0),
// Reject requests when process RSS is over size in bytes (zero is no max)
maxRssBytes: Joi.number().integer().default(0),
// Milliseconds of delay after which requests are rejected (zero is no max)
maxEventLoopDelay: Joi.number().integer().default(0)
}).default()
}).default(),
circuitBreaker: Joi.object().keys({
enabled: Joi.boolean().default(false),
// Minimum successes in the half-open state to change to close state
minSuccesses: Joi.number().integer().default(1),
// The duration (milliseconds) when the server is ready to accept further calls after changing to open state
halfOpenTime: Joi.number().integer().default(5000),
// Frequency of reseting the circuit breaker to close state in milliseconds
resetIntervalTime: Joi.number().integer().default(15000),
// The threshold when the circuit breaker change to open state
maxFailures: Joi.number().integer().default(3)
}).default()
bloomrun: Joi.object()
.keys({
indexing: Joi.any()
.valid(['insertion', 'depth'])
.default('insertion'),
// Checks if the pattern is no duplicate based on to the indexing strategy
lookupBeforeAdd: Joi.boolean().default(false)
})
.default(),
load: Joi.object()
.keys({
// Check on every request (server) if the load policy was observed,
checkPolicy: Joi.boolean().default(true),
// Should gracefully exit the process to recover from memory leaks or load, crashOnFatal must be enabled
shouldCrash: Joi.boolean().default(true),
process: Joi.object()
.keys({
// Frequency of load sampling in milliseconds (zero is no sampling)
sampleInterval: Joi.number()
.integer()
.default(0)
})
.default(),
policy: Joi.object()
.keys({
// Reject requests when V8 heap is over size in bytes (zero is no max)
maxHeapUsedBytes: Joi.number()
.integer()
.default(0),
// Reject requests when process RSS is over size in bytes (zero is no max)
maxRssBytes: Joi.number()
.integer()
.default(0),
// Milliseconds of delay after which requests are rejected (zero is no max)
maxEventLoopDelay: Joi.number()
.integer()
.default(0)
})
.default()
})
.default()
})

@@ -16,3 +16,8 @@ 'use strict'

// NATS conn error codes
NATS_CONN_ERROR_CODES: ['CONN_ERR', 'SECURE_CONN_REQ_MSG', 'NON_SECURE_CONN_REQ_MSG', 'CLIENT_CERT_REQ_MSG'],
NATS_CONN_ERROR_CODES: [
'CONN_ERR',
'SECURE_CONN_REQ_MSG',
'NON_SECURE_CONN_REQ_MSG',
'CLIENT_CERT_REQ_MSG'
],
// NATS erros

@@ -41,4 +46,2 @@ NATS_TRANSPORT_ERROR: 'Could not connect to NATS!',

ADD_MIDDLEWARE_ERROR: 'Middleware error',
PLUGIN_ALREADY_REGISTERED: 'Plugin was already registered',
PLUGIN_ADDED: 'PLUGIN - ADDED!',
PAYLOAD_VALIDATION_ERROR: 'Invalid payload',

@@ -52,10 +55,12 @@ ADD_ADDED: 'ADD - ADDED',

PLUGIN_NAME_REQUIRED: 'Plugin name is required',
PLUGIN_REGISTRATION_ERROR: 'Error during plugin registration',
DECORATION_ALREADY_DEFINED: 'Server decoration already defined',
OVERRIDE_BUILTIN_METHOD_NOT_ALLOWED: 'Cannot override the built-in server interface method',
MISSING_DECORATE_DEPENDENCY: 'Missing decorate dependency',
DECORATION_ALREADY_DEFINED: 'Decoration has been already added',
GRACEFULLY_SHUTDOWN: 'Gracefully shutdown',
PLUGIN_TIMEOUT_ERROR: 'Plugin callback was not called',
ACT_PATTERN_REQUIRED: 'Pattern is required to start an act call',
ADD_PATTERN_REQUIRED: 'Pattern is required to define an add',
NO_USE_IN_PLUGINS: 'Call `use()` inside plugins not allowed'
TERMINATE_AFTER_TIMEOUT: 'Terminate process after timeout',
PROCESS_TERMINATED: 'Process terminated',
TRIGGERING_CLOSE_HOOK: 'Triggering close hook',
REPLY_ERROR_ALREADY_SET: 'Error already set',
REPLY_ALREADY_SENT: 'Reply already sent'
}

@@ -13,3 +13,3 @@ 'use strict'

class Decoder {
static decode (msg) {
static decode(msg) {
return Parse(msg)

@@ -19,3 +19,3 @@ }

function Parse (data) {
function Parse(data) {
if (!(this instanceof Parse)) {

@@ -22,0 +22,0 @@ return new Parse(data)

@@ -15,3 +15,3 @@ 'use strict'

class Encoder {
static encode (msg) {
static encode(msg) {
try {

@@ -18,0 +18,0 @@ return {

@@ -11,26 +11,66 @@ 'use strict'

*/
const BaseExtension = require('./baseExtension')
const _ = require('lodash')
const Constants = require('./constants')
const Errors = require('./errors')
/**
*
*
* @class Extension
*/
class Extension extends BaseExtension {
class Extension {
constructor() {
this._stack = []
this._types = [
'onClientPreRequest',
'onClientPostRequest',
'onServerPreHandler',
'onServerPreRequest',
'onServerPreResponse',
'onClose'
]
this.onClientPreRequest = []
this.onClientPostRequest = []
this.onServerPreHandler = []
this.onServerPreRequest = []
this.onServerPreResponse = []
}
/**
* Executes the stack of functions
*
* @param {any} ctx
* @param {any} cb
*
* @param {any} handler
*
* @memberof Extension
*/
dispatch (ctx, cb) {
const each = (item, next) => {
item(ctx, next)
_add(type, handler) {
if (this._types.indexOf(type) === -1) {
let error = new Errors.HemeraError(Constants.INVALID_EXTENSION_TYPE, {
type
})
throw error
}
this.run(each, cb)
this[type].push((arg, next) => {
arg.push(next)
handler.apply(null, arg)
})
}
/**
*
*
* @param {any} handler
*
* @memberOf Extension
*/
add(type, handler) {
if (_.isArray(handler)) {
handler.forEach(h => this._add(type, h))
} else {
this._add(type, handler)
}
}
}
module.exports = Extension

@@ -15,3 +15,2 @@ 'use strict'

const Errors = require('./errors')
const CircuitBreaker = require('./circuitBreaker')

@@ -24,42 +23,45 @@ /**

*/
function onClientPreRequest (ctx, next) {
let pattern = ctx._pattern
function onClientPreRequest(context, next) {
let pattern = context._pattern
let prevCtx = ctx._prevContext
let cleanPattern = ctx._cleanPattern
let prevCtx = context._prevContext
let cleanPattern = context._cleanPattern
let currentTime = Util.nowHrTime()
// shared context
ctx.context$ = pattern.context$ || prevCtx.context$
context.context$ = pattern.context$ || prevCtx.context$
// set metadata by passed pattern or current message context
ctx.meta$ = Object.assign(pattern.meta$ || {}, ctx.meta$)
context.meta$ = Object.assign(pattern.meta$ || {}, context.meta$)
// is only passed by msg
ctx.delegate$ = pattern.delegate$ || {}
context.delegate$ = pattern.delegate$ || {}
// tracing
ctx.trace$ = pattern.trace$ || {}
ctx.trace$.parentSpanId = ctx.trace$.spanId || prevCtx.trace$.spanId
ctx.trace$.traceId = ctx.trace$.traceId || prevCtx.trace$.traceId || Util.randomId()
ctx.trace$.spanId = Util.randomId()
ctx.trace$.timestamp = currentTime
ctx.trace$.service = pattern.topic
ctx.trace$.method = Util.pattern(pattern)
context.trace$ = pattern.trace$ || {}
context.trace$.parentSpanId = context.trace$.spanId || prevCtx.trace$.spanId
context.trace$.traceId =
context.trace$.traceId || prevCtx.trace$.traceId || Util.randomId()
context.trace$.spanId = Util.randomId()
context.trace$.timestamp = currentTime
context.trace$.service = pattern.topic
context.trace$.method = Util.pattern(pattern)
// detect recursion
if (ctx._config.maxRecursion > 1) {
const callSignature = `${ctx.trace$.traceId}:${ctx.trace$.method}`
if (ctx.meta$ && ctx.meta$.referrers) {
var count = ctx.meta$.referrers[callSignature]
if (context._config.maxRecursion > 1) {
const callSignature = `${context.trace$.traceId}:${context.trace$.method}`
if (context.meta$ && context.meta$.referrers) {
var count = context.meta$.referrers[callSignature]
count += 1
ctx.meta$.referrers[callSignature] = count
if (count > ctx._config.maxRecursion) {
ctx.meta$.referrers = null
return next(new Errors.MaxRecursionError({
count: --count
}))
context.meta$.referrers[callSignature] = count
if (count > context._config.maxRecursion) {
context.meta$.referrers = null
return next(
new Errors.MaxRecursionError({
count: --count
})
)
}
} else {
ctx.meta$.referrers = {}
ctx.meta$.referrers[callSignature] = 1
context.meta$.referrers = {}
context.meta$.referrers[callSignature] = 1
}

@@ -71,7 +73,10 @@ }

id: pattern.requestId$ || Util.randomId(),
parentId: ctx.request$.id || pattern.requestParentId$,
type: pattern.pubsub$ === true ? Constants.REQUEST_TYPE_PUBSUB : Constants.REQUEST_TYPE_REQUEST
parentId: context.request$.id || pattern.requestParentId$,
type:
pattern.pubsub$ === true
? Constants.REQUEST_TYPE_PUBSUB
: Constants.REQUEST_TYPE_REQUEST
}
ctx.emit('clientPreRequest', ctx)
context.emit('clientPreRequest', context)

@@ -81,12 +86,12 @@ // build msg

pattern: cleanPattern,
meta: ctx.meta$,
delegate: ctx.delegate$,
trace: ctx.trace$,
meta: context.meta$,
delegate: context.delegate$,
trace: context.trace$,
request: request
}
ctx._message = message
context._message = message
ctx.log.debug({
outbound: ctx
context.log.debug({
outbound: context
})

@@ -101,53 +106,26 @@

* @param {any} next
* @returns
*/
function onClientPreRequestCircuitBreaker (ctx, next) {
if (ctx._config.circuitBreaker.enabled) {
// any pattern represent an own circuit breaker
const circuitBreaker = ctx._circuitBreakerMap.get(ctx.trace$.method)
if (!circuitBreaker) {
const cb = new CircuitBreaker(ctx._config.circuitBreaker)
ctx._circuitBreakerMap.set(ctx.trace$.method, cb)
} else {
if (!circuitBreaker.available()) {
// trigger half-open timer
circuitBreaker.record()
return next(new Errors.CircuitBreakerError(`Circuit breaker is ${circuitBreaker.state}`, { state: circuitBreaker.state, method: ctx.trace$.method, service: ctx.trace$.service }))
}
}
function onClientPostRequest(context, next) {
let pattern = context._pattern
let msg = context._response.payload
next()
} else {
next()
}
}
/**
*
*
* @param {any} next
*/
function onClientPostRequest (ctx, next) {
let pattern = ctx._pattern
let msg = ctx._response.payload
// pass to act context
if (msg) {
ctx.request$ = msg.request || {}
ctx.trace$ = msg.trace || {}
ctx.meta$ = msg.meta || {}
context.request$ = msg.request || {}
context.trace$ = msg.trace || {}
context.meta$ = msg.meta || {}
}
// calculate request duration
let diff = Util.nowHrTime() - ctx.trace$.timestamp
ctx.trace$.duration = diff
let diff = Util.nowHrTime() - context.trace$.timestamp
context.trace$.duration = diff
ctx.request$.service = pattern.topic
ctx.request$.method = ctx.trace$.method
context.request$.service = pattern.topic
context.request$.method = context.trace$.method
ctx.log.debug({
inbound: ctx
context.log.debug({
inbound: context
})
ctx.emit('clientPostRequest', ctx)
context.emit('clientPostRequest', context)

@@ -165,7 +143,8 @@ next()

*/
function onServerPreRequest (ctx, req, res, next) {
let m = ctx._decoderPipeline.run(ctx._request.payload, ctx)
function onServerPreRequest(context, req, res, next) {
let m = context._decoderPipeline.run(context._request.payload, context)
if (m.error) {
return res.send(m.error)
next(m.error)
return
}

@@ -176,18 +155,18 @@

if (msg) {
ctx.meta$ = msg.meta || {}
ctx.trace$ = msg.trace || {}
ctx.delegate$ = msg.delegate || {}
ctx.request$ = msg.request || {}
ctx.auth$ = {}
context.meta$ = msg.meta || {}
context.trace$ = msg.trace || {}
context.delegate$ = msg.delegate || {}
context.request$ = msg.request || {}
context.auth$ = {}
}
ctx._request.payload = m.value
ctx._request.error = m.error
context._request.payload = m.value
context._request.error = m.error
// icnoming pattern
ctx._pattern = ctx._request.payload.pattern
context._pattern = context._request.payload.pattern
// find matched route
ctx._actMeta = ctx._router.lookup(ctx._pattern)
context._actMeta = context._router.lookup(context._pattern)
ctx.emit('serverPreRequest', ctx)
context.emit('serverPreRequest', context)

@@ -205,7 +184,7 @@ next()

*/
function onServerPreRequestLoadTest (ctx, req, res, next) {
if (ctx._config.load.checkPolicy) {
const error = ctx._loadPolicy.check()
function onServerPreRequestLoadTest(context, req, res, next) {
if (context._config.load.checkPolicy) {
const error = context._loadPolicy.check()
if (error) {
ctx._shouldCrash = ctx._config.load.shouldCrash
context._shouldCrash = context._config.load.shouldCrash
return next(new Errors.ProcessLoadError(error.message, error.data))

@@ -225,4 +204,4 @@ }

*/
function onServerPreHandler (ctx, req, res, next) {
ctx.emit('serverPreHandler', ctx)
function onServerPreHandler(context, req, res, next) {
context.emit('serverPreHandler', context)

@@ -232,4 +211,4 @@ next()

function onServerPreResponse (ctx, req, res, next) {
ctx.emit('serverPreResponse', ctx)
function onServerPreResponse(context, req, res, next) {
context.emit('serverPreResponse', context)

@@ -239,6 +218,9 @@ next()

module.exports.onClientPreRequest = [onClientPreRequest, onClientPreRequestCircuitBreaker]
module.exports.onClientPreRequest = [onClientPreRequest]
module.exports.onClientPostRequest = [onClientPostRequest]
module.exports.onServerPreRequest = [onServerPreRequest, onServerPreRequestLoadTest]
module.exports.onServerPreRequest = [
onServerPreRequest,
onServerPreRequestLoadTest
]
module.exports.onServerPreHandler = [onServerPreHandler]
module.exports.onServerPreResponse = [onServerPreResponse]

@@ -19,3 +19,2 @@ 'use strict'

const Errio = require('errio')
const Hoek = require('hoek')
const Heavy = require('heavy')

@@ -26,10 +25,9 @@ const _ = require('lodash')

const SuperError = require('super-error')
const Co = require('co')
const Hoek = require('hoek')
const Joi = require('joi')
const Avvio = require('avvio')
const Series = require('fastseries')
const GracefulShutdown = require('./gracefulShutdown')
const Errors = require('./errors')
const Constants = require('./constants')
const Extension = require('./extension')
const ServerExtension = require('./serverExtension')
const Util = require('./util')

@@ -47,13 +45,7 @@ const NatsTransport = require('./transport')

const CodecPipeline = require('./codecPipeline')
const Reply = require('./reply')
const Add = require('./add')
const Plugin = require('./plugin')
const Extension = require('./extension')
const pDefer = require('p-defer')
// Extension finish handler
const onServerPreResponse = require('./onServerPreResponse')
const onServerPreRequest = require('./onServerPreRequest')
const onClientTimeoutPostRequest = require('./onClientTimeoutPostRequest')
const onPreRequest = require('./onPreRequest')
const onClientPostRequest = require('./onClientPostRequest')
const onClose = require('./onClose')
/**

@@ -71,3 +63,3 @@ * @class Hemera

*/
constructor (transport, params) {
constructor(transport, params) {
super()

@@ -87,3 +79,7 @@

this._topics = {}
this._exposition = {}
this.plugin$ = {
options: {
name: 'core'
}
}

@@ -95,8 +91,2 @@ // special variables for the new execution context

this.auth$ = {}
this.plugin$ = new Plugin({
options: {},
attributes: {
name: 'core'
}
})
this.trace$ = {}

@@ -120,26 +110,12 @@ this.request$ = {

this._cleanPattern = ''
this._pluginRegistrations = []
this._decorations = {}
// create reference to root hemera instance
this._root = this
// contains the list of all registered plugins
// the core is also a plugin
this._plugins = {
core: this.plugin$
core: this
}
// keep reference to root hemera instance
this._root = this
this._encoderPipeline = new CodecPipeline().add(DefaultEncoder.encode)
this._decoderPipeline = new CodecPipeline().add(DefaultDecoder.decode)
// define extension points
this._extensions = {
onClientPreRequest: new Extension('onClientPreRequest'),
onClientPostRequest: new Extension('onClientPostRequest'),
onServerPreHandler: new ServerExtension('onServerPreHandler'),
onServerPreRequest: new ServerExtension('onServerPreRequest'),
onServerPreResponse: new ServerExtension('onServerPreResponse'),
onClose: new Extension('onClose')
}
// errio settings

@@ -160,13 +136,53 @@ Errio.setDefaults(this._config.errio)

// will be executed before the client request is executed.
this._extensions.onClientPreRequest.add(DefaultExtensions.onClientPreRequest)
// will be executed after the client has received and decoded the request
this._extensions.onClientPostRequest.add(DefaultExtensions.onClientPostRequest)
// will be executed before the server has received the requests
this._extensions.onServerPreRequest.add(DefaultExtensions.onServerPreRequest)
// will be executed before the server action is executed
this._extensions.onServerPreHandler.add(DefaultExtensions.onServerPreHandler)
// will be executed before the server has replied the response and build the message
this._extensions.onServerPreResponse.add(DefaultExtensions.onServerPreResponse)
this._ext = new Extension()
this._ext.add('onClientPreRequest', DefaultExtensions.onClientPreRequest)
this._ext.add('onClientPostRequest', DefaultExtensions.onClientPostRequest)
this._ext.add('onServerPreRequest', DefaultExtensions.onServerPreRequest)
this._ext.add('onServerPreHandler', DefaultExtensions.onServerPreHandler)
this._ext.add('onServerPreResponse', DefaultExtensions.onServerPreResponse)
this._avvio = Avvio(this, {
expose: {
use: 'register',
close: 'shutdown',
onClose: 'onShutdown',
ready: 'bootstrap'
}
})
this._avvio.override = (hemera, plugin, opts) => {
const res = Object.create(hemera)
const proto = Object.getPrototypeOf(res)
if (hemera._config.childLogger) {
res.log = hemera.log.child({ plugin: opts.name })
}
// plugin seperated prototype propertys
res.plugin$ = {
options: opts,
name: opts.name
}
// extend prototype so that each nested plugin have access
res.decorate = function decorate(prop, value, deps) {
if (prop in this) {
throw new Errors.HemeraError(Constants.DECORATION_ALREADY_DEFINED)
}
if (deps) {
res._checkDecoraterDependencies(deps)
}
// extend prototype
proto[prop] = value
}
this._plugins[opts.name] = res
return res
}
this._series = Series()
// use own logger

@@ -178,8 +194,11 @@ if (this._config.logger) {

let pretty = Pino.pretty()
this.log = Pino({
name: this._config.name,
safe: true, // avoid error caused by circular references
level: this._config.logLevel,
serializers: Serializers
}, pretty)
this.log = Pino(
{
name: this._config.name,
safe: true, // avoid error caused by circular references
level: this._config.logLevel,
serializers: Serializers
},
pretty
)

@@ -199,6 +218,2 @@ // Leads to too much listeners in tests

}
this._gracefulShutdown = new GracefulShutdown(this.log)
this._gracefulShutdown.addHandler((code, cb) => this.close(cb))
this._gracefulShutdown.init()
}

@@ -211,3 +226,3 @@

*/
_registerErrors () {
_registerErrors() {
for (var error in Hemera.errors) {

@@ -223,3 +238,3 @@ Errio.register(Hemera.errors[error])

*/
get decoder () {
get decoder() {
return this._decoderPipeline

@@ -234,3 +249,3 @@ }

*/
get encoder () {
get encoder() {
return this._encoderPipeline

@@ -240,13 +255,2 @@ }

/**
* Return all registered plugins
*
* @readonly
*
* @memberOf Hemera
*/
get plugins () {
return this._plugins
}
/**
* Return the bloomrun instance

@@ -258,3 +262,3 @@ *

*/
get router () {
get router() {
return this._router

@@ -270,3 +274,3 @@ }

*/
get load () {
get load() {
return this._heavy.load

@@ -276,13 +280,2 @@ }

/**
* Return the shared object of all exposed data
*
* @readonly
* @type {Exposition}
* @memberOf Hemera
*/
get exposition () {
return this._exposition
}
/**
* Return the underlying NATS driver

@@ -294,3 +287,3 @@ *

*/
get transport () {
get transport() {
return this._transport.driver

@@ -306,3 +299,3 @@ }

*/
get topics () {
get topics() {
return this._topics

@@ -318,3 +311,3 @@ }

*/
get config () {
get config() {
return this._config

@@ -330,3 +323,3 @@ }

*/
get errorDetails () {
get errorDetails() {
if (this._isServer) {

@@ -355,3 +348,3 @@ return {

*/
static get errors () {
static get errors() {
return Errors

@@ -367,3 +360,3 @@ }

*/
static createError (name) {
static createError(name) {
const ctor = SuperError.subclass(name)

@@ -376,22 +369,2 @@ // Register the class with Errio.

/**
* Exposed data in context of the current plugin
* It is accessible by this.expositions[<plugin>][<key>]
*
* @param {string} key
* @param {mixed} object
*
* @memberOf Hemera
*/
expose (key, object) {
let pluginName = this.plugin$.attributes.name
if (!this._exposition[pluginName]) {
this._exposition[pluginName] = {}
this._exposition[pluginName][key] = object
} else {
this._exposition[pluginName][key] = object
}
}
/**
* Add an extension. Extensions are called in serie

@@ -404,71 +377,18 @@ *

*/
ext (type, handler) {
if (!this._extensions[type]) {
let error = new Errors.HemeraError(Constants.INVALID_EXTENSION_TYPE, {
type
})
this.log.error(error)
throw error
ext(type, handler) {
if (type === 'onClose') {
this.onShutdown(handler)
} else {
this._ext.add(type, handler)
}
this._extensions[type].add(handler)
}
/**
* Use a plugin.
*
* @param {any} plugin
*
* @memberOf Hemera
* @readonly
* @memberof Hemera
*/
use (params, options) {
// use plugin infos from package.json
if (_.isObject(params.attributes.pkg)) {
params.attributes = params.attributes || {}
params.attributes = Hoek.applyToDefaults(params.attributes, _.pick(params.attributes.pkg, ['name', 'description', 'version']))
}
let pluginOptions = {}
// pass options as second argument during plugin registration
if (_.isObject(options)) {
pluginOptions = Hoek.clone(params.options) || {}
pluginOptions = Hoek.applyToDefaults(pluginOptions, options)
} else if (params.options) {
pluginOptions = Hoek.clone(params.options)
}
// plugin name is required
if (!params.attributes.name) {
let error = new Errors.HemeraError(Constants.PLUGIN_NAME_REQUIRED)
this.log.error(error)
this.emit('error', error)
return
}
// create new execution context
let ctx = this.createContext()
const plugin = new Plugin({
register: params.plugin.bind(ctx),
attributes: params.attributes,
parentPluginName: this.plugin$.attributes.name,
options: pluginOptions
})
ctx.plugin$ = plugin
if (ctx._config.childLogger) {
ctx.log = this.log.child({ plugin: plugin.attributes.name })
}
ctx.use = () => {
ctx.log.error(Constants.NO_USE_IN_PLUGINS)
throw new Errors.HemeraError(Constants.NO_USE_IN_PLUGINS)
}
this._pluginRegistrations.push(plugin)
this.log.info(params.attributes.name, Constants.PLUGIN_ADDED)
this._plugins[params.attributes.name] = plugin
get plugins() {
return this._plugins
}

@@ -484,3 +404,3 @@

*/
setOption (key, value) {
setOption(key, value) {
this.plugin$.options[key] = value

@@ -495,3 +415,3 @@ }

*/
setConfig (key, value) {
setConfig(key, value) {
this._config[key] = value

@@ -501,2 +421,12 @@ }

/**
* Return the root instance
*
* @returns
* @memberof Hemera
*/
root() {
return this._root
}
/**
* Exit the process

@@ -506,9 +436,20 @@ *

*/
fatal () {
this._gracefulShutdown.shutdown('fatal')
fatal() {
process.exit(1)
}
/**
*
*
* @param {any} prop
* @returns
* @memberof Hemera
*/
hasDecorator(prop) {
return prop in this
}
/**
* Decorate the root instance
* Value is globaly accesible
* Value is globaly accessible
*

@@ -520,30 +461,26 @@ * @param {any} prop

*/
decorate (prop, value) {
if (this._decorations[prop]) {
throw new Error(Constants.DECORATION_ALREADY_DEFINED)
} else if (this[prop]) {
throw new Error(Constants.OVERRIDE_BUILTIN_METHOD_NOT_ALLOWED)
decorate(prop, value, deps) {
if (this[prop]) {
throw new Errors.HemeraError(Constants.DECORATION_ALREADY_DEFINED)
}
this._decorations[prop] = {
plugin: this.plugin$,
value
if (deps) {
this._checkDecoraterDependencies(deps)
}
// decorate root hemera instance
this._root[prop] = value
this[prop] = value
}
/**
* Create a custom super error object in a running hemera instance
*
* @param {any} name
* @returns
*
* @memberOf Hemera
* @param {any} deps
* @memberof Hemera
*/
createError (name) {
const ctor = SuperError.subclass(name)
// Register the class with Errio.
Errio.register(ctor)
return ctor
_checkDecoraterDependencies(deps) {
for (var i = 0; i < deps.length; i++) {
if (!(deps in this)) {
throw new Error(Constants.MISSING_DECORATE_DEPENDENCY)
}
}
}

@@ -554,45 +491,61 @@

*
* @param {any} plugins
* @returns
* @memberof Hemera
*/
_use(plugin, opts, cb) {
const pluginOpts = Hoek.clone(plugin.options || {})
const options = Hoek.applyToDefaults(pluginOpts, opts || {}, true)
if (!options.name) {
throw new Errors.HemeraError(Constants.PLUGIN_NAME_REQUIRED)
}
this.register(plugin.plugin, options, cb)
}
/**
*
*
* @param {any} plugin
* @param {any} opts
* @param {any} cb
* @returns
* @memberof Hemera
*/
registerPlugins (cb) {
const each = (item, next) => {
// plugin has no callback
if (item.register.length < 2) {
item.register(item.options)
return next()
}
use(plugin, opts, cb) {
// when opts is omitted
if (_.isFunction(opts)) {
cb = opts
opts = {}
}
// Detect plugin timeouts
const pluginTimer = setTimeout(() => {
const internalError = new Errors.PluginTimeoutError(Constants.PLUGIN_TIMEOUT_ERROR)
this.log.error(internalError, `Plugin: ${item.attributes.name}`)
next(internalError)
}, this._config.pluginTimeout)
item.register(item.options, (err) => {
clearTimeout(pluginTimer)
next(err)
if (_.isArray(plugin)) {
plugin.forEach(p => {
this._use(p, opts, cb)
})
} else {
this._use(plugin, opts, cb)
}
// register all plugins in serie
Util.serial(this._pluginRegistrations, each, (err) => {
if (err) {
if (err instanceof SuperError) {
err = err.rootCause || err.cause || err
}
const internalError = new Errors.HemeraError(Constants.PLUGIN_REGISTRATION_ERROR).causedBy(err)
this.log.error(internalError)
this.emit('error', internalError)
} else if (_.isFunction(cb)) {
cb.call(this)
}
})
return this._avvio
}
/**
* Create a custom super error object in a running hemera instance
*
* @param {any} name
* @returns
*
* @memberOf Hemera
*/
createError(name) {
const ctor = SuperError.subclass(name)
// Register the class with Errio.
Errio.register(ctor)
return ctor
}
/**
*
*
* @param {Function} cb

@@ -602,6 +555,6 @@ *

*/
ready (cb) {
this._transport.driver.on('error', (err) => {
ready(cb) {
this._transport.driver.on('error', err => {
this.log.error(err, Constants.NATS_TRANSPORT_ERROR)
this.log.error('NATS Code: \'%s\', Message: %s', err.code, err.message)
this.log.error("NATS Code: '%s', Message: %s", err.code, err.message)

@@ -616,3 +569,3 @@ // Exit only on connection issues.

this._transport.driver.on('permission_error', (err) => {
this._transport.driver.on('permission_error', err => {
this.log.error(err, Constants.NATS_PERMISSION_ERROR)

@@ -639,3 +592,7 @@ })

this.log.info(Constants.NATS_TRANSPORT_CONNECTED)
this.registerPlugins(cb)
this.bootstrap(err => {
if (_.isFunction(cb)) {
cb(err)
}
})
})

@@ -645,49 +602,111 @@ }

/**
* Build the final payload for the response
* Last step before the response is send to the callee.
* The preResponse extension is dispatched and previous errors are evaluated.
*
*
* @memberOf Hemera
*/
_buildMessage () {
let result = this._response
finish() {
const self = this
const args = [self, self._request, self._reply]
self._series(self, self._ext['onServerPreResponse'], args, err =>
self._onServerPreResponseCompleted(err)
)
}
/**
*
*
* @param {any} extensionError
* @memberof Hemera
*/
_onServerPreResponseCompleted(extensionError) {
const self = this
// check if any error was set before
if (extensionError) {
self._reply.error = extensionError
const internalError = new Errors.HemeraError(
Constants.EXTENSION_ERROR,
self.errorDetails
).causedBy(extensionError)
self.log.error(internalError)
self.emit('serverResponseError', self._reply.error)
}
const result = self._createMessage()
self._send(result)
}
/**
*
*
* @param {any} msg
* @memberof Hemera
*/
_send(msg) {
const self = this
// indicates that an error occurs and that the program should exit
if (self._shouldCrash) {
// only when we have an inbox othwerwise exit the service immediately
if (self._replyTo) {
// send error back to callee
self._transport.send(self._replyTo, msg, () => {
// let it crash
if (self._config.crashOnFatal) {
self.fatal()
}
})
return
} else if (self._config.crashOnFatal) {
self.fatal()
return
}
}
// reply only when we have an inbox
if (self._replyTo) {
self._transport.send(self._replyTo, msg)
}
}
/**
*
*
* @param {any} err
* @returns
* @memberof Hemera
*/
_createMessage() {
const self = this
let message = {
meta: this.meta$ || {},
trace: this.trace$ || {},
request: this.request$,
result: result.error ? null : result.payload,
error: result.error ? Errio.toObject(result.error) : null
meta: self.meta$ || {},
trace: self.trace$ || {},
request: self.request$,
result: self._reply.error ? null : self._reply.payload,
error: self._reply.error ? Errio.toObject(self._reply.error) : null
}
let m = this._encoderPipeline.run(message, this)
let m = self._encoderPipeline.run(message, self)
// attach encoding issues
if (m.error) {
let internalError = new Errors.ParseError(Constants.PAYLOAD_PARSING_ERROR).causedBy(m.error)
let internalError = new Errors.ParseError(
Constants.PAYLOAD_PARSING_ERROR
).causedBy(m.error)
message.error = Errio.toObject(internalError)
message.result = null
// Retry to encode with issue perhaps the reason was data related
m = this._encoderPipeline.run(message, this)
this.log.error(internalError)
this.emit('serverResponseError', m.error)
m = self._encoderPipeline.run(message, self)
self.log.error(internalError)
self.emit('serverResponseError', m.error)
}
// final response
this._message = m.value
return m.value
}
/**
* Last step before the response is send to the callee.
* The preResponse extension is dispatched and previous errors are evaluated.
*
* @memberOf Hemera
*/
finish () {
this._extensions.onServerPreResponse
.dispatch(this, (err, val) => onServerPreResponse(this, err, val))
}
/**
*
*
* @param {any} err

@@ -699,3 +718,3 @@ * @param {any} resp

*/
_actionHandler (err, resp) {
_actionHandler(err, resp) {
const self = this

@@ -718,15 +737,12 @@

if (err instanceof SuperError) {
self._response.error = err.rootCause || err.cause || err
} else {
self._response.error = err
}
self._reply.error = self.getRootError(err)
return self.finish()
self.finish()
return
}
// assign action result
self._response.payload = resp
// delete error we have payload
self._response.error = null
// set only when we have a result and the reply interface wasn't called
if (resp && !self._reply.sent) {
self._reply.payload = resp
}

@@ -747,3 +763,3 @@ self.finish()

*/
subscribe (topic, subToMany, maxMessages, queue) {
subscribe(topic, subToMany, maxMessages, queue) {
const self = this

@@ -759,14 +775,16 @@

// create new execution context
let ctx = this.createContext()
ctx._shouldCrash = false
ctx._replyTo = replyTo
ctx._topic = topic
ctx._request = new ServerRequest(request)
ctx._response = new ServerResponse()
ctx._pattern = {}
ctx._actMeta = {}
ctx._isServer = true
let hemera = self.createContext()
hemera._topic = topic
hemera._replyTo = replyTo
hemera._request = new ServerRequest(request)
hemera._response = new ServerResponse()
hemera._reply = new Reply(hemera._request, hemera._response, hemera.log)
hemera._pattern = {}
hemera._actMeta = {}
hemera._isServer = true
ctx._extensions.onServerPreRequest
.dispatch(ctx, (err, val) => onServerPreRequest(ctx, err, val))
const args = [hemera, hemera._request, hemera._reply]
hemera._series(hemera, self._ext['onServerPreRequest'], args, err =>
hemera._onServerPreRequestCompleted(err)
)
}

@@ -776,12 +794,20 @@

if (subToMany) {
self._topics[topic] = self._transport.subscribe(topic, {
max: maxMessages
}, handler)
self._topics[topic] = self._transport.subscribe(
topic,
{
max: maxMessages
},
handler
)
} else {
const queueGroup = queue || `${Constants.NATS_QUEUEGROUP_PREFIX}.${topic}`
// queue group names allow load balancing of services
self._topics[topic] = self._transport.subscribe(topic, {
'queue': queueGroup,
max: maxMessages
}, handler)
self._topics[topic] = self._transport.subscribe(
topic,
{
queue: queueGroup,
max: maxMessages
},
handler
)
}

@@ -791,2 +817,107 @@ }

/**
*
* @param {any} extensionError
* @param {any} value
* @memberof Hemera
*/
_onServerPreRequestCompleted(extensionError) {
const self = this
// check if any error was set before
if (extensionError) {
self._reply.error = extensionError
self.finish()
return
}
// check if a handler is registered with this pattern
if (self._actMeta) {
const args = [self, self._request, self._reply]
self._series(self, self._ext['onServerPreHandler'], args, err =>
self._onServerPreHandlerCompleted(err)
)
} else {
const internalError = new Errors.PatternNotFound(
Constants.PATTERN_NOT_FOUND,
self.errorDetails
)
self.log.error(internalError)
self._reply.error = internalError
self.emit('serverResponseError', self._reply.error)
// send error back to callee
self.finish()
}
}
/**
*
*
* @param {any} extensionError
* @memberof Hemera
*/
_onServerPreHandlerCompleted(extensionError) {
const self = this
// check if any error was set before
if (extensionError) {
self._reply.error = extensionError
const internalError = new Errors.HemeraError(
Constants.EXTENSION_ERROR,
self.errorDetails
).causedBy(extensionError)
self.log.error(internalError)
self.emit('serverResponseError', self._reply.error)
self.finish()
return
}
try {
let action = self._actMeta.action.bind(self)
self._actMeta.run(self._request, self._reply, err => {
if (err) {
self._reply.error = err
const internalError = new Errors.HemeraError(
Constants.ADD_MIDDLEWARE_ERROR,
self.errorDetails
).causedBy(err)
self.log.error(internalError)
self.emit('serverResponseError', self._reply.error)
self.finish()
return
}
// if request type is 'pubsub' we dont have to reply back
if (
self._request.payload.request.type === Constants.REQUEST_TYPE_PUBSUB
) {
action(self._request.payload.pattern)
self.finish()
return
}
// execute RPC action
if (self._actMeta.isPromisable) {
action(self._request.payload.pattern)
.then(x => self._actionHandler(null, x))
.catch(e => self._actionHandler(e))
} else {
action(self._request.payload.pattern, (err, result) =>
self._actionHandler(err, result)
)
}
})
} catch (err) {
self._reply.error = self.getRootError(err)
// service should exit
self._shouldCrash = true
self.finish()
}
}
/**
* Unsubscribe a topic or subscription id from NATS and Hemera

@@ -800,7 +931,9 @@ *

*/
remove (topic, maxMessages) {
remove(topic, maxMessages) {
const self = this
if (!topic) {
let error = new Errors.HemeraError(Constants.TOPIC_SID_REQUIRED_FOR_DELETION)
let error = new Errors.HemeraError(
Constants.TOPIC_SID_REQUIRED_FOR_DELETION
)
self.log.error(error)

@@ -832,3 +965,3 @@ throw error

*/
cleanTopic (topic) {
cleanTopic(topic) {
// release topic so we can add it again

@@ -852,3 +985,3 @@ delete this._topics[topic]

*/
add (pattern, cb) {
add(pattern, cb) {
// check for use quick syntax for JSON objects

@@ -888,3 +1021,2 @@ if (_.isString(pattern)) {

// cb is null when we use chaining syntax
if (cb) {

@@ -908,5 +1040,8 @@ // set callback

this.log.error({
pattern
}, error)
this.log.error(
{
pattern
},
error
)
this.emit('error', error)

@@ -925,3 +1060,4 @@ }

pattern.maxMessages$,
pattern.queue$)
pattern.queue$
)

@@ -938,3 +1074,3 @@ return addDefinition

*/
_sendRequestHandler (response) {
_sendRequestHandler(response) {
const self = this

@@ -948,6 +1084,8 @@ const res = self._decoderPipeline.run(response, self)

if (self._response.error) {
let error = new Errors.ParseError(Constants.PAYLOAD_PARSING_ERROR, self.errorDetails).causedBy(self._response.error)
let error = new Errors.ParseError(
Constants.PAYLOAD_PARSING_ERROR,
self.errorDetails
).causedBy(self._response.error)
self.log.error(error)
self.emit('clientResponseError', error)
self._execute(error)

@@ -957,12 +1095,12 @@ return

self._extensions.onClientPostRequest.dispatch(self, (err) => onClientPostRequest(self, err))
// Execute onClientPostRequest extension
self._series(self, self._ext['onClientPostRequest'], [self], err =>
self._onClientPostRequestCompleted(err)
)
} catch (err) {
let error = null
if (err instanceof SuperError) {
error = err.rootCause || err.cause || err
} else {
error = err
}
const internalError = new Errors.FatalError(Constants.FATAL_ERROR, self.errorDetails).causedBy(err)
let error = self.getRootError(err)
const internalError = new Errors.FatalError(
Constants.FATAL_ERROR,
self.errorDetails
).causedBy(err)
self.log.fatal(internalError)

@@ -974,2 +1112,4 @@ self.emit('clientResponseError', error)

self.fatal()
} else {
self._execute(error)
}

@@ -980,2 +1120,39 @@ }

/**
*
*
* @param {any} extensionError
* @memberof Hemera
*/
_onClientPostRequestCompleted(extensionError) {
const self = this
if (extensionError) {
let error = self.getRootError(extensionError)
const internalError = new Errors.HemeraError(
Constants.EXTENSION_ERROR,
self.errorDetails
).causedBy(extensionError)
self.log.error(internalError)
self.emit('clientResponseError', error)
self._execute(error)
return
}
if (self._response.payload.error) {
let error = Errio.fromObject(self._response.payload.error)
const internalError = new Errors.BusinessError(
Constants.BUSINESS_ERROR,
self.errorDetails
).causedBy(error)
self.log.error(internalError)
self.emit('clientResponseError', error)
self._execute(error)
return
}
self._execute(null, self._response.payload.result)
}
/**
* Start an action.

@@ -988,3 +1165,3 @@ *

*/
act (pattern, cb) {
act(pattern, cb) {
// check for use quick syntax for JSON objects

@@ -1002,16 +1179,20 @@ if (_.isString(pattern)) {

// create new execution context
let ctx = this.createContext()
ctx._pattern = pattern
ctx._prevContext = this
ctx._cleanPattern = Util.cleanFromSpecialVars(pattern)
ctx._response = new ClientResponse()
ctx._request = new ClientRequest()
ctx._isServer = false
ctx._execute = null
ctx._hasCallback = false
ctx._isPromisable = false
let hemera = this.createContext()
hemera._pattern = pattern
hemera._prevContext = this
hemera._cleanPattern = Util.cleanFromSpecialVars(pattern)
hemera._response = new ClientResponse()
hemera._request = new ClientRequest()
hemera._isServer = false
hemera._execute = null
hemera._defer = pDefer()
hemera._isPromisable = false
hemera._actCallback = null
// topic is needed to subscribe on a subject in NATS
if (!pattern.topic) {
let error = new Errors.HemeraError(Constants.NO_TOPIC_TO_REQUEST, ctx.errorDetails)
let error = new Errors.HemeraError(
Constants.NO_TOPIC_TO_REQUEST,
hemera.errorDetails
)
this.log.error(error)

@@ -1022,57 +1203,117 @@ throw error

if (cb) {
ctx._hasCallback = true
if (Util.isGeneratorFunction(cb)) {
ctx._actCallback = Co.wrap(cb.bind(ctx))
ctx._isPromisable = true
} else if (Util.isAsyncFunction(cb)) {
ctx._actCallback = cb.bind(ctx)
ctx._isPromisable = true
if (Util.isAsyncFunction(cb)) {
hemera._actCallback = cb.bind(hemera)
hemera._isPromisable = true
} else {
ctx._actCallback = cb.bind(ctx)
ctx._isPromisable = false
hemera._actCallback = cb.bind(hemera)
hemera._isPromisable = false
}
}
const promise = new Promise((resolve, reject) => {
ctx._execute = (err, result) => {
if (ctx._config.circuitBreaker.enabled) {
const circuitBreaker = ctx._circuitBreakerMap.get(ctx.trace$.method)
if (err) {
circuitBreaker.failure()
} else {
circuitBreaker.success()
}
}
if (ctx._hasCallback) {
if (ctx._isPromisable) {
ctx._actCallback(err, result)
.then(x => resolve(x))
.catch(x => reject(x))
} else {
// any return value in a callback function will fullfilled the
// promise but an error will reject it
const r = ctx._actCallback(err, result)
if (r instanceof Error) {
reject(r)
} else {
resolve(r)
}
}
hemera._execute = (err, result) => {
if (hemera._isPromisable) {
hemera
._actCallback(err, result)
.then(hemera._defer.resolve)
.catch(hemera._defer.reject)
} else if (hemera._actCallback) {
const res = hemera._actCallback(err, result)
hemera._defer.resolve(res)
} else {
if (err) {
hemera._defer.reject(err)
} else {
if (err) {
reject(err)
} else {
resolve(result)
}
hemera._defer.resolve(result)
}
}
})
}
ctx._extensions.onClientPreRequest.dispatch(ctx, (err) => onPreRequest(ctx, err))
// Execute onClientPreRequest extension
hemera._series(hemera, hemera._ext['onClientPreRequest'], [hemera], err =>
hemera._onPreRequestCompleted(err)
)
return promise
return hemera._defer.promise
}
/**
*
*
* @param {any} err
* @returns
* @memberof Hemera
*/
getRootError(err) {
let error = null
if (err instanceof SuperError) {
error = err.rootCause || err.cause || err
} else {
error = err
}
return error
}
/**
*
*
* @param {any} err
* @memberof Hemera
*/
_onPreRequestCompleted(err) {
const self = this
let m = self._encoderPipeline.run(self._message, self)
// encoding issue
if (m.error) {
let error = new Errors.ParseError(
Constants.PAYLOAD_PARSING_ERROR
).causedBy(m.error)
self.log.error(error)
self.emit('clientResponseError', error)
self._execute(error)
return
}
if (err) {
let error = self.getRootError(err)
const internalError = new Errors.HemeraError(
Constants.EXTENSION_ERROR
).causedBy(err)
self.log.error(internalError)
self.emit('clientResponseError', error)
self._execute(error)
return
}
self._request.payload = m.value
self._request.error = m.error
// use simple publish mechanism instead of request/reply
if (self._pattern.pubsub$ === true) {
if (self._actCallback) {
self.log.info(Constants.PUB_CALLBACK_REDUNDANT)
}
self._transport.send(self._pattern.topic, self._request.payload)
} else {
const optOptions = {}
// limit on the number of responses the requestor may receive
if (self._pattern.maxMessages$ > 0) {
optOptions.max = self._pattern.maxMessages$
} else if (self._pattern.maxMessages$ !== -1) {
optOptions.max = 1
}
// send request
self._sid = self._transport.sendRequest(
self._pattern.topic,
self._request.payload,
optOptions,
resp => self._sendRequestHandler(resp)
)
// handle timeout
self.handleTimeout()
}
}
/**
* Handle the timeout when a pattern could not be resolved. Can have different reasons:

@@ -1086,3 +1327,3 @@ * - No one was connected at the time (service unavailable)

*/
handleTimeout () {
handleTimeout() {
const self = this

@@ -1092,7 +1333,14 @@ const timeout = self._pattern.timeout$ || this._config.timeout

let timeoutHandler = () => {
const error = new Errors.TimeoutError(Constants.ACT_TIMEOUT_ERROR, self.errorDetails)
const error = new Errors.TimeoutError(
Constants.ACT_TIMEOUT_ERROR,
self.errorDetails
)
self.log.error(error)
self._response.error = error
self.emit('clientResponseError', error)
self._extensions.onClientPostRequest.dispatch(self, (err) => onClientTimeoutPostRequest(self, err))
// Execute onClientPostRequest extension
self._series(self, self._ext['onClientPostRequest'], [self], err =>
self._onClientTimeoutPostRequestCompleted(err)
)
}

@@ -1104,2 +1352,39 @@

/**
*
*
* @param {any} err
* @memberof Hemera
*/
_onClientTimeoutPostRequestCompleted(err) {
const self = this
if (err) {
let error = self.getRootError(err)
const internalError = new Errors.HemeraError(
Constants.EXTENSION_ERROR
).causedBy(err)
self.log.error(internalError)
self._response.error = error
self.emit('clientResponseError', error)
}
try {
self._execute(self._response.error)
} catch (err) {
let error = self.getRootError(err)
const internalError = new Errors.FatalError(
Constants.FATAL_ERROR,
self.errorDetails
).causedBy(err)
self.log.fatal(internalError)
self.emit('clientResponseError', error)
// let it crash
if (self._config.crashOnFatal) {
self.fatal()
}
}
}
/**
* Create new instance of hemera but based on the current prototype

@@ -1112,8 +1397,8 @@ * so we are able to create a scope per act without lossing the reference to the core api.

*/
createContext () {
createContext() {
const self = this
const ctx = Object.create(self)
const hemera = Object.create(self)
return ctx
return hemera
}

@@ -1130,3 +1415,3 @@

*/
list (pattern, options) {
list(pattern, options) {
return this._router.list(pattern, options)

@@ -1140,3 +1425,3 @@ }

*/
removeAll () {
removeAll() {
_.each(this._topics, (val, key) => this.remove(key))

@@ -1146,10 +1431,46 @@ }

/**
* Gracefully shutdown of all resources.
* Unsubscribe all subscriptiuons and close the underlying NATS connection
*
*
* @memberof Hemera
*/
close(cb) {
this.shutdown((err, instance, done) => {
instance._onClose(err, err => {
done(err)
cb(err)
})
})
}
/**
*
*
* @param {any} err
* @param {any} cb
* @memberof Hemera
*/
close (cb) {
this._extensions.onClose.dispatch(this, (err, val) => onClose(this, err, val, cb))
_onClose(err, cb) {
const self = this
// remove all active subscriptions
self.removeAll()
// Waiting before all queued messages was proceed
// and then close hemera and nats
self._transport.flush(() => {
self._heavy.stop()
// Does not throw an issue when connection is not available
self._transport.close()
if (err) {
self.log.error(err)
self.emit('error', err)
if (_.isFunction(cb)) {
cb(err)
}
} else {
if (_.isFunction(cb)) {
cb()
}
}
})
}

@@ -1156,0 +1477,0 @@ }

@@ -12,2 +12,4 @@ 'use strict'

const Constants = require('./constants')
/**

@@ -23,10 +25,12 @@ *

* @param {any} response
* @param {any} extensionCallback
* @param {any} hemera
*
* @memberof Reply
*/
constructor (request, response, extensionCallback) {
constructor(request, response, logger) {
this._request = request
this._response = response
this.extensionCallback = extensionCallback
this.log = logger
this.sent = false
this._errored = false
}

@@ -41,3 +45,3 @@

*/
set payload (value) {
set payload(value) {
this._response.payload = value

@@ -53,3 +57,3 @@ }

*/
get payload () {
get payload() {
return this._response.payload

@@ -59,8 +63,14 @@ }

/**
* Set the response error
* Error can not be set twice
*
*
*
* @memberof Reply
*/
set error (value) {
set error(value) {
if (this._errored) {
this.log.debug(new Error(Constants.REPLY_ERROR_ALREADY_SET))
return
}
this._errored = true
this._response.error = value

@@ -76,3 +86,3 @@ }

*/
get error () {
get error() {
return this._response.error

@@ -82,28 +92,23 @@ }

/**
* Abort the current request and respond wih the passed value
* Set the response error
*
* @param {any} value
*
* @param {any} msg
* @memberof Reply
*/
end (value) {
if (value instanceof Error) {
this.extensionCallback(value)
} else {
this.extensionCallback(null, value, true)
send(msg) {
const self = this
if (self.sent) {
self.log.warn(new Error(Constants.REPLY_ALREADY_SENT))
return
}
}
/**
* Runs through all extensions and keep the passed value to respond it
*
* @param {any} value
*
* @memberof Reply
*/
send (value) {
if (value instanceof Error) {
this.extensionCallback(value)
} else {
this.extensionCallback(null, value)
self.sent = true
if (msg) {
if (msg instanceof Error) {
self.error = msg
} else {
self.payload = msg
}
}

@@ -110,0 +115,0 @@ }

@@ -12,7 +12,7 @@ 'use strict'

function nanoToMsString (val) {
function nanoToMsString(val) {
return (val / 1e6).toFixed(2) + 'ms'
}
function inbound (ctx) {
function inbound(ctx) {
return {

@@ -25,3 +25,3 @@ id: ctx.request$.id,

function outbound (ctx) {
function outbound(ctx) {
return {

@@ -28,0 +28,0 @@ id: ctx._message.request.id,

@@ -23,5 +23,4 @@ 'use strict'

*/
constructor (payload) {
constructor(payload) {
this._request = {}
this._locals = {}
this.payload = payload

@@ -37,3 +36,3 @@ }

*/
get payload () {
get payload() {
return this._request.value

@@ -46,17 +45,6 @@ }

* @readonly
*
* @memberof ServerRequest
*/
get locals () {
return this._locals
}
/**
*
*
* @readonly
* @type {*}
* @memberOf ServerRequest
*/
get error () {
get error() {
return this._request.error

@@ -71,3 +59,3 @@ }

*/
set payload (value) {
set payload(value) {
this._request.value = value

@@ -82,3 +70,3 @@ }

*/
set error (error) {
set error(error) {
this._request.error = error

@@ -85,0 +73,0 @@ }

@@ -12,2 +12,4 @@ 'use strict'

const SuperError = require('super-error')
/**

@@ -24,3 +26,3 @@ * @class ServerResponse

*/
constructor () {
constructor() {
this._response = {}

@@ -36,3 +38,3 @@ }

*/
get payload () {
get payload() {
return this._response.value

@@ -47,3 +49,3 @@ }

*/
set payload (value) {
set payload(value) {
this._response.value = value

@@ -58,4 +60,8 @@ }

*/
set error (error) {
this._response.error = error
set error(error) {
if (error instanceof SuperError) {
this._response.error = error.rootCause || error.cause || error
} else {
this._response.error = error
}
}

@@ -70,3 +76,3 @@

*/
get error () {
get error() {
return this._response.error

@@ -73,0 +79,0 @@ }

@@ -25,3 +25,3 @@ 'use strict'

*/
constructor (params) {
constructor(params) {
this.nc = params.transport

@@ -37,3 +37,3 @@ }

*/
get driver () {
get driver() {
return this.nc

@@ -49,3 +49,3 @@ }

*/
timeout () {
timeout() {
return this.nc.timeout.apply(this.nc, arguments)

@@ -61,3 +61,3 @@ }

*/
send () {
send() {
return this.nc.publish.apply(this.nc, arguments)

@@ -73,3 +73,3 @@ }

*/
close () {
close() {
return this.nc.close.apply(this.nc, arguments)

@@ -84,3 +84,3 @@ }

*/
flush () {
flush() {
return this.nc.flush.apply(this.nc, arguments)

@@ -96,3 +96,3 @@ }

*/
subscribe () {
subscribe() {
return this.nc.subscribe.apply(this.nc, arguments)

@@ -108,3 +108,3 @@ }

*/
unsubscribe () {
unsubscribe() {
return this.nc.unsubscribe.apply(this.nc, arguments)

@@ -120,3 +120,3 @@ }

*/
sendRequest () {
sendRequest() {
return this.nc.request.apply(this.nc, arguments)

@@ -123,0 +123,0 @@ }

@@ -13,6 +13,7 @@ 'use strict'

const _ = require('lodash')
const Co = require('co')
const lut = []
for (let i = 0; i < 256; i++) { lut[i] = (i < 16 ? '0' : '') + (i).toString(16) }
for (let i = 0; i < 256; i++) {
lut[i] = (i < 16 ? '0' : '') + i.toString(16)
}

@@ -32,3 +33,3 @@ /**

*/
static natsWildcardToRegex (subject) {
static natsWildcardToRegex(subject) {
let hasTokenWildcard = subject.indexOf('*') > -1

@@ -49,6 +50,2 @@ let hasFullWildcard = subject.indexOf('>') > -1

/**
* Convert a generator or async function
* to promise factory function and call the last
* argument as callback
*
* @static

@@ -58,19 +55,12 @@ * @param {any} handler

*/
static toPromiseFact (handler) {
if (Util.isGeneratorFunction(handler)) {
return function () {
static toPromiseFact(handler) {
if (Util.isAsyncFunction(handler)) {
return function() {
// -1 because (req, res, next)
const next = arguments[arguments.length - 1]
return Co(handler.apply(null, arguments))
return handler
.apply(null, arguments)
.then(x => next(null, x))
.catch(next)
}
} else if (Util.isAsyncFunction(handler)) {
return function () {
// -1 because (req, res, next)
const next = arguments[arguments.length - 1]
return handler.apply(null, arguments)
.then(x => next(null, x))
.catch(next)
}
} else {

@@ -85,11 +75,25 @@ return handler

*/
static randomId () {
const d0 = Math.random() * 0xffffffff | 0
const d1 = Math.random() * 0xffffffff | 0
const d2 = Math.random() * 0xffffffff | 0
const d3 = Math.random() * 0xffffffff | 0
return lut[d0 & 0xff] + lut[d0 >> 8 & 0xff] + lut[d0 >> 16 & 0xff] + lut[d0 >> 24 & 0xff] +
lut[d1 & 0xff] + lut[d1 >> 8 & 0xff] + lut[d1 >> 16 & 0x0f | 0x40] + lut[d1 >> 24 & 0xff] +
lut[d2 & 0x3f | 0x80] + lut[d2 >> 8 & 0xff] + lut[d2 >> 16 & 0xff] + lut[d2 >> 24 & 0xff] +
lut[d3 & 0xff] + lut[d3 >> 8 & 0xff] + lut[d3 >> 16 & 0xff] + lut[d3 >> 24 & 0xff]
static randomId() {
const d0 = (Math.random() * 0xffffffff) | 0
const d1 = (Math.random() * 0xffffffff) | 0
const d2 = (Math.random() * 0xffffffff) | 0
const d3 = (Math.random() * 0xffffffff) | 0
return (
lut[d0 & 0xff] +
lut[(d0 >> 8) & 0xff] +
lut[(d0 >> 16) & 0xff] +
lut[(d0 >> 24) & 0xff] +
lut[d1 & 0xff] +
lut[(d1 >> 8) & 0xff] +
lut[((d1 >> 16) & 0x0f) | 0x40] +
lut[(d1 >> 24) & 0xff] +
lut[(d2 & 0x3f) | 0x80] +
lut[(d2 >> 8) & 0xff] +
lut[(d2 >> 16) & 0xff] +
lut[(d2 >> 24) & 0xff] +
lut[d3 & 0xff] +
lut[(d3 >> 8) & 0xff] +
lut[(d3 >> 16) & 0xff] +
lut[(d3 >> 24) & 0xff]
)
}

@@ -107,3 +111,3 @@

*/
static serial (array, method, callback) {
static eachSeries(array, method, callback) {
if (!array.length) {

@@ -113,4 +117,4 @@ callback()

let i = 0
const iterate = function () {
const done = function (err) {
const iterate = function() {
const done = function(err) {
if (err) {

@@ -135,41 +139,2 @@ callback(err)

/**
* Executes a series of callbacks and allows to interrupt
* as well as to continue with a final value
*
* @param {Array<Function>} array
* @param {Function} method
* @param {Function} callback
*
* @memberOf Extension
*/
static serialWithCancellation (array, method, callback) {
if (!array.length) {
callback()
} else {
let i = 0
const iterate = function () {
const done = function (err, value, abort) {
if (err) {
callback(err)
} else if (value && abort) {
callback(null, value)
} else {
i = i + 1
if (i < array.length) {
iterate(value)
} else {
callback(null, value)
}
}
}
method(array[i], done)
}
iterate()
}
}
/**
* Get high resolution time in nanoseconds

@@ -182,5 +147,5 @@ *

*/
static nowHrTime () {
static nowHrTime() {
const hrtime = process.hrtime()
return ((+hrtime[0]) * 1e9) + (+hrtime[1])
return +hrtime[0] * 1e9 + +hrtime[1]
}

@@ -196,6 +161,6 @@ /**

*/
static extractSchema (obj) {
static extractSchema(obj) {
if (obj === null) return obj
return _.pickBy(obj, function (val, prop) {
return _.pickBy(obj, function(val, prop) {
return _.isObject(val)

@@ -211,7 +176,7 @@ })

*/
static cleanPattern (obj) {
static cleanPattern(obj) {
if (obj === null) return obj
return _.pickBy(obj, function (val, prop) {
return !_.includes(prop, '$') && !_.isObject(val)
return _.pickBy(obj, function(val, prop) {
return (!_.includes(prop, '$') && !_.isObject(val)) || _.isRegExp(val)
})

@@ -227,6 +192,6 @@ }

*/
static cleanFromSpecialVars (obj) {
static cleanFromSpecialVars(obj) {
if (obj === null) return obj
return _.pickBy(obj, function (val, prop) {
return _.pickBy(obj, function(val, prop) {
return !_.includes(prop, '$')

@@ -242,3 +207,3 @@ })

*/
static pattern (args) {
static pattern(args) {
if (_.isString(args)) {

@@ -250,4 +215,4 @@ return args

let sb = []
_.each(args, function (v, k) {
if (!~k.indexOf('$') && !_.isFunction(v) && !_.isObject(v)) {
_.each(args, function(v, k) {
if ((!~k.indexOf('$') && !_.isFunction(v) && !_.isObject(v)) || _.isRegExp(v)) {
sb.push(k + ':' + v)

@@ -262,3 +227,3 @@ }

/**
/*
*

@@ -269,6 +234,5 @@ *

* @returns
*
* @memberof Util
*/
static isGeneratorFunction (obj) {
static isAsyncFunction(obj) {
var constructor = obj.constructor

@@ -278,3 +242,6 @@ if (!constructor) {

}
if (constructor.name === 'GeneratorFunction' || constructor.displayName === 'GeneratorFunction') {
if (
constructor.name === 'AsyncFunction' ||
constructor.displayName === 'AsyncFunction'
) {
return true

@@ -284,23 +251,4 @@ }

}
/**
*
*
* @static
* @param {any} obj
* @returns
* @memberof Util
*/
static isAsyncFunction (obj) {
var constructor = obj.constructor
if (!constructor) {
return false
}
if (constructor.name === 'AsyncFunction' || constructor.displayName === 'AsyncFunction') {
return true
}
return false
}
}
module.exports = Util
{
"name": "nats-hemera",
"author": "Dustin Deus (https://github.com/StarpTech)",
"version": "1.6.2",
"version": "2.0.0-0",
"main": "lib/index.js",

@@ -43,11 +43,13 @@ "homepage": "https://hemerajs.github.io/hemera/",

"dependencies": {
"bloomrun": "4.0.0",
"co": "4.6.x",
"avvio": "2.2.x",
"bloomrun": "4.0.x",
"errio": "1.2.x",
"fast-safe-stringify": "1.2.x",
"fastparallel": "2.3.x",
"fastseries": "1.7.x",
"heavy": "4.0.x",
"hoek": "4.2.x",
"joi": "11.0.x",
"joi": "11.1.x",
"lodash": "4.17.x",
"p-defer": "1.0.x",
"pino": "4.7.x",

@@ -54,0 +56,0 @@ "super-error": "2.1.x",

# Hemera package
[![npm](https://img.shields.io/npm/v/nats-hemera.svg?maxAge=3600)](https://www.npmjs.com/package/nats-hemera)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg)](http://standardjs.com)
[![styled with prettier](https://img.shields.io/badge/styled_with-prettier-ff69b4.svg)](#badge)
This is the core package of hemera. For more details see [Hemera](https://github.com/hemerajs/hemera)

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc