nats-hemera
Advanced tools
Comparing version 1.6.2 to 2.0.0-0
@@ -13,3 +13,2 @@ 'use strict' | ||
const _ = require('lodash') | ||
const Co = require('co') | ||
const Util = require('./util') | ||
@@ -30,3 +29,3 @@ | ||
*/ | ||
constructor (actMeta, options) { | ||
constructor(actMeta, options) { | ||
this.actMeta = actMeta | ||
@@ -45,3 +44,3 @@ this.options = options | ||
*/ | ||
_use (handler) { | ||
_use(handler) { | ||
this.actMeta.middleware.push(Util.toPromiseFact(handler)) | ||
@@ -58,3 +57,3 @@ } | ||
*/ | ||
use (handler) { | ||
use(handler) { | ||
if (_.isArray(handler)) { | ||
@@ -75,3 +74,3 @@ handler.forEach(h => this._use(h)) | ||
*/ | ||
end (cb) { | ||
end(cb) { | ||
this.action = cb | ||
@@ -89,6 +88,10 @@ } | ||
*/ | ||
dispatch (request, response, cb) { | ||
Util.serial(this.middleware, (item, next) => { | ||
item(request, response, next) | ||
}, cb) | ||
run(request, response, cb) { | ||
Util.eachSeries( | ||
this.middleware, | ||
(item, next) => { | ||
item(request, response, next) | ||
}, | ||
cb | ||
) | ||
} | ||
@@ -103,3 +106,3 @@ | ||
*/ | ||
get middleware () { | ||
get middleware() { | ||
return this.actMeta.middleware | ||
@@ -114,3 +117,3 @@ } | ||
*/ | ||
get schema () { | ||
get schema() { | ||
return this.actMeta.schema | ||
@@ -126,3 +129,3 @@ } | ||
*/ | ||
get pattern () { | ||
get pattern() { | ||
return this.actMeta.pattern | ||
@@ -137,7 +140,4 @@ } | ||
*/ | ||
set action (action) { | ||
if (Util.isGeneratorFunction(action)) { | ||
this.actMeta.action = Co.wrap(action) | ||
this.isPromisable = true | ||
} else if (Util.isAsyncFunction(action)) { | ||
set action(action) { | ||
if (Util.isAsyncFunction(action)) { | ||
this.actMeta.action = action | ||
@@ -157,3 +157,3 @@ this.isPromisable = true | ||
*/ | ||
get action () { | ||
get action() { | ||
return this.actMeta.action | ||
@@ -168,3 +168,3 @@ } | ||
*/ | ||
get plugin () { | ||
get plugin() { | ||
return this.actMeta.plugin | ||
@@ -171,0 +171,0 @@ } |
@@ -23,3 +23,3 @@ 'use strict' | ||
*/ | ||
constructor () { | ||
constructor() { | ||
this._request = {} | ||
@@ -35,3 +35,3 @@ } | ||
*/ | ||
get payload () { | ||
get payload() { | ||
return this._request.value | ||
@@ -47,3 +47,3 @@ } | ||
*/ | ||
get error () { | ||
get error() { | ||
return this._request.error | ||
@@ -58,3 +58,3 @@ } | ||
*/ | ||
set payload (value) { | ||
set payload(value) { | ||
this._request.value = value | ||
@@ -69,3 +69,3 @@ } | ||
*/ | ||
set error (error) { | ||
set error(error) { | ||
this._request.error = error | ||
@@ -72,0 +72,0 @@ } |
@@ -22,3 +22,3 @@ 'use strict' | ||
*/ | ||
constructor () { | ||
constructor() { | ||
this._response = {} | ||
@@ -34,3 +34,3 @@ } | ||
*/ | ||
get payload () { | ||
get payload() { | ||
return this._response.value | ||
@@ -45,3 +45,3 @@ } | ||
*/ | ||
set payload (value) { | ||
set payload(value) { | ||
this._response.value = value | ||
@@ -56,3 +56,3 @@ } | ||
*/ | ||
set error (error) { | ||
set error(error) { | ||
this._response.error = error | ||
@@ -68,3 +68,3 @@ } | ||
*/ | ||
get error () { | ||
get error() { | ||
return this._response.error | ||
@@ -71,0 +71,0 @@ } |
@@ -22,3 +22,3 @@ 'use strict' | ||
*/ | ||
constructor (type) { | ||
constructor(type) { | ||
this._type = type | ||
@@ -34,3 +34,3 @@ this._stack = [] | ||
*/ | ||
add (step) { | ||
add(step) { | ||
this._stack.push(step) | ||
@@ -47,3 +47,3 @@ return this | ||
*/ | ||
reset (step) { | ||
reset(step) { | ||
this._stack = step ? [step] : [] | ||
@@ -60,3 +60,3 @@ return this | ||
*/ | ||
first (step) { | ||
first(step) { | ||
this._stack.unshift(step) | ||
@@ -71,3 +71,3 @@ return this | ||
*/ | ||
run (msg, ctx) { | ||
run(msg, ctx) { | ||
let firstError = null | ||
@@ -74,0 +74,0 @@ |
@@ -7,5 +7,9 @@ const Joi = require('joi') | ||
// Max execution time of a request | ||
timeout: Joi.number().integer().default(2000), | ||
timeout: Joi.number() | ||
.integer() | ||
.default(2000), | ||
// Max initialization time of a plugin | ||
pluginTimeout: Joi.number().integer().default(3000), | ||
pluginTimeout: Joi.number() | ||
.integer() | ||
.default(3000), | ||
tag: Joi.string().default(''), | ||
@@ -18,59 +22,76 @@ // Enables pretty log formatter in Pino default logger | ||
crashOnFatal: Joi.boolean().default(true), | ||
logLevel: Joi.any().valid(['fatal', 'error', 'warn', 'info', 'debug', 'trace', 'silent']).default('silent'), | ||
logLevel: Joi.any() | ||
.valid(['fatal', 'error', 'warn', 'info', 'debug', 'trace', 'silent']) | ||
.default('silent'), | ||
// Create a child logger per section / plugin. Only possible with default logger Pino. | ||
childLogger: Joi.boolean().default(false), | ||
// Max recursive method calls | ||
maxRecursion: Joi.number().integer().default(0), | ||
maxRecursion: Joi.number() | ||
.integer() | ||
.default(0), | ||
// Custom logger | ||
logger: Joi.object().optional(), | ||
// The error serialization options | ||
errio: Joi.object().keys({ | ||
// Recursively serialize and deserialize nested errors | ||
recursive: Joi.boolean().default(true), | ||
// Include inherited properties | ||
inherited: Joi.boolean().default(true), | ||
// Include stack property | ||
stack: Joi.boolean().default(true), | ||
// Include properties with leading or trailing underscores | ||
private: Joi.boolean().default(false), | ||
// Property names to exclude (low priority) | ||
exclude: Joi.array().items(Joi.string()).default([]), | ||
// Property names to include (high priority) | ||
include: Joi.array().items(Joi.string()).default([]) | ||
}).default(), | ||
errio: Joi.object() | ||
.keys({ | ||
// Recursively serialize and deserialize nested errors | ||
recursive: Joi.boolean().default(true), | ||
// Include inherited properties | ||
inherited: Joi.boolean().default(true), | ||
// Include stack property | ||
stack: Joi.boolean().default(true), | ||
// Include properties with leading or trailing underscores | ||
private: Joi.boolean().default(false), | ||
// Property names to exclude (low priority) | ||
exclude: Joi.array() | ||
.items(Joi.string()) | ||
.default([]), | ||
// Property names to include (high priority) | ||
include: Joi.array() | ||
.items(Joi.string()) | ||
.default([]) | ||
}) | ||
.default(), | ||
// Pattern matching options | ||
bloomrun: Joi.object().keys({ | ||
indexing: Joi.any().valid(['insertion', 'depth']).default('insertion'), | ||
// Checks if the pattern is no duplicate based on to the indexing strategy | ||
lookupBeforeAdd: Joi.boolean().default(false) | ||
}).default(), | ||
load: Joi.object().keys({ | ||
// Check on every request (server) if the load policy was observed, | ||
checkPolicy: Joi.boolean().default(true), | ||
// Should gracefully exit the process to recover from memory leaks or load, crashOnFatal must be enabled | ||
shouldCrash: Joi.boolean().default(true), | ||
process: Joi.object().keys({ | ||
// Frequency of load sampling in milliseconds (zero is no sampling) | ||
sampleInterval: Joi.number().integer().default(0) | ||
}).default(), | ||
policy: Joi.object().keys({ | ||
// Reject requests when V8 heap is over size in bytes (zero is no max) | ||
maxHeapUsedBytes: Joi.number().integer().default(0), | ||
// Reject requests when process RSS is over size in bytes (zero is no max) | ||
maxRssBytes: Joi.number().integer().default(0), | ||
// Milliseconds of delay after which requests are rejected (zero is no max) | ||
maxEventLoopDelay: Joi.number().integer().default(0) | ||
}).default() | ||
}).default(), | ||
circuitBreaker: Joi.object().keys({ | ||
enabled: Joi.boolean().default(false), | ||
// Minimum successes in the half-open state to change to close state | ||
minSuccesses: Joi.number().integer().default(1), | ||
// The duration (milliseconds) when the server is ready to accept further calls after changing to open state | ||
halfOpenTime: Joi.number().integer().default(5000), | ||
// Frequency of reseting the circuit breaker to close state in milliseconds | ||
resetIntervalTime: Joi.number().integer().default(15000), | ||
// The threshold when the circuit breaker change to open state | ||
maxFailures: Joi.number().integer().default(3) | ||
}).default() | ||
bloomrun: Joi.object() | ||
.keys({ | ||
indexing: Joi.any() | ||
.valid(['insertion', 'depth']) | ||
.default('insertion'), | ||
// Checks if the pattern is no duplicate based on to the indexing strategy | ||
lookupBeforeAdd: Joi.boolean().default(false) | ||
}) | ||
.default(), | ||
load: Joi.object() | ||
.keys({ | ||
// Check on every request (server) if the load policy was observed, | ||
checkPolicy: Joi.boolean().default(true), | ||
// Should gracefully exit the process to recover from memory leaks or load, crashOnFatal must be enabled | ||
shouldCrash: Joi.boolean().default(true), | ||
process: Joi.object() | ||
.keys({ | ||
// Frequency of load sampling in milliseconds (zero is no sampling) | ||
sampleInterval: Joi.number() | ||
.integer() | ||
.default(0) | ||
}) | ||
.default(), | ||
policy: Joi.object() | ||
.keys({ | ||
// Reject requests when V8 heap is over size in bytes (zero is no max) | ||
maxHeapUsedBytes: Joi.number() | ||
.integer() | ||
.default(0), | ||
// Reject requests when process RSS is over size in bytes (zero is no max) | ||
maxRssBytes: Joi.number() | ||
.integer() | ||
.default(0), | ||
// Milliseconds of delay after which requests are rejected (zero is no max) | ||
maxEventLoopDelay: Joi.number() | ||
.integer() | ||
.default(0) | ||
}) | ||
.default() | ||
}) | ||
.default() | ||
}) |
@@ -16,3 +16,8 @@ 'use strict' | ||
// NATS conn error codes | ||
NATS_CONN_ERROR_CODES: ['CONN_ERR', 'SECURE_CONN_REQ_MSG', 'NON_SECURE_CONN_REQ_MSG', 'CLIENT_CERT_REQ_MSG'], | ||
NATS_CONN_ERROR_CODES: [ | ||
'CONN_ERR', | ||
'SECURE_CONN_REQ_MSG', | ||
'NON_SECURE_CONN_REQ_MSG', | ||
'CLIENT_CERT_REQ_MSG' | ||
], | ||
// NATS erros | ||
@@ -41,4 +46,2 @@ NATS_TRANSPORT_ERROR: 'Could not connect to NATS!', | ||
ADD_MIDDLEWARE_ERROR: 'Middleware error', | ||
PLUGIN_ALREADY_REGISTERED: 'Plugin was already registered', | ||
PLUGIN_ADDED: 'PLUGIN - ADDED!', | ||
PAYLOAD_VALIDATION_ERROR: 'Invalid payload', | ||
@@ -52,10 +55,12 @@ ADD_ADDED: 'ADD - ADDED', | ||
PLUGIN_NAME_REQUIRED: 'Plugin name is required', | ||
PLUGIN_REGISTRATION_ERROR: 'Error during plugin registration', | ||
DECORATION_ALREADY_DEFINED: 'Server decoration already defined', | ||
OVERRIDE_BUILTIN_METHOD_NOT_ALLOWED: 'Cannot override the built-in server interface method', | ||
MISSING_DECORATE_DEPENDENCY: 'Missing decorate dependency', | ||
DECORATION_ALREADY_DEFINED: 'Decoration has been already added', | ||
GRACEFULLY_SHUTDOWN: 'Gracefully shutdown', | ||
PLUGIN_TIMEOUT_ERROR: 'Plugin callback was not called', | ||
ACT_PATTERN_REQUIRED: 'Pattern is required to start an act call', | ||
ADD_PATTERN_REQUIRED: 'Pattern is required to define an add', | ||
NO_USE_IN_PLUGINS: 'Call `use()` inside plugins not allowed' | ||
TERMINATE_AFTER_TIMEOUT: 'Terminate process after timeout', | ||
PROCESS_TERMINATED: 'Process terminated', | ||
TRIGGERING_CLOSE_HOOK: 'Triggering close hook', | ||
REPLY_ERROR_ALREADY_SET: 'Error already set', | ||
REPLY_ALREADY_SENT: 'Reply already sent' | ||
} |
@@ -13,3 +13,3 @@ 'use strict' | ||
class Decoder { | ||
static decode (msg) { | ||
static decode(msg) { | ||
return Parse(msg) | ||
@@ -19,3 +19,3 @@ } | ||
function Parse (data) { | ||
function Parse(data) { | ||
if (!(this instanceof Parse)) { | ||
@@ -22,0 +22,0 @@ return new Parse(data) |
@@ -15,3 +15,3 @@ 'use strict' | ||
class Encoder { | ||
static encode (msg) { | ||
static encode(msg) { | ||
try { | ||
@@ -18,0 +18,0 @@ return { |
@@ -11,26 +11,66 @@ 'use strict' | ||
*/ | ||
const BaseExtension = require('./baseExtension') | ||
const _ = require('lodash') | ||
const Constants = require('./constants') | ||
const Errors = require('./errors') | ||
/** | ||
* | ||
* | ||
* @class Extension | ||
*/ | ||
class Extension extends BaseExtension { | ||
class Extension { | ||
constructor() { | ||
this._stack = [] | ||
this._types = [ | ||
'onClientPreRequest', | ||
'onClientPostRequest', | ||
'onServerPreHandler', | ||
'onServerPreRequest', | ||
'onServerPreResponse', | ||
'onClose' | ||
] | ||
this.onClientPreRequest = [] | ||
this.onClientPostRequest = [] | ||
this.onServerPreHandler = [] | ||
this.onServerPreRequest = [] | ||
this.onServerPreResponse = [] | ||
} | ||
/** | ||
* Executes the stack of functions | ||
* | ||
* @param {any} ctx | ||
* @param {any} cb | ||
* | ||
* @param {any} handler | ||
* | ||
* @memberof Extension | ||
*/ | ||
dispatch (ctx, cb) { | ||
const each = (item, next) => { | ||
item(ctx, next) | ||
_add(type, handler) { | ||
if (this._types.indexOf(type) === -1) { | ||
let error = new Errors.HemeraError(Constants.INVALID_EXTENSION_TYPE, { | ||
type | ||
}) | ||
throw error | ||
} | ||
this.run(each, cb) | ||
this[type].push((arg, next) => { | ||
arg.push(next) | ||
handler.apply(null, arg) | ||
}) | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} handler | ||
* | ||
* @memberOf Extension | ||
*/ | ||
add(type, handler) { | ||
if (_.isArray(handler)) { | ||
handler.forEach(h => this._add(type, h)) | ||
} else { | ||
this._add(type, handler) | ||
} | ||
} | ||
} | ||
module.exports = Extension |
@@ -15,3 +15,2 @@ 'use strict' | ||
const Errors = require('./errors') | ||
const CircuitBreaker = require('./circuitBreaker') | ||
@@ -24,42 +23,45 @@ /** | ||
*/ | ||
function onClientPreRequest (ctx, next) { | ||
let pattern = ctx._pattern | ||
function onClientPreRequest(context, next) { | ||
let pattern = context._pattern | ||
let prevCtx = ctx._prevContext | ||
let cleanPattern = ctx._cleanPattern | ||
let prevCtx = context._prevContext | ||
let cleanPattern = context._cleanPattern | ||
let currentTime = Util.nowHrTime() | ||
// shared context | ||
ctx.context$ = pattern.context$ || prevCtx.context$ | ||
context.context$ = pattern.context$ || prevCtx.context$ | ||
// set metadata by passed pattern or current message context | ||
ctx.meta$ = Object.assign(pattern.meta$ || {}, ctx.meta$) | ||
context.meta$ = Object.assign(pattern.meta$ || {}, context.meta$) | ||
// is only passed by msg | ||
ctx.delegate$ = pattern.delegate$ || {} | ||
context.delegate$ = pattern.delegate$ || {} | ||
// tracing | ||
ctx.trace$ = pattern.trace$ || {} | ||
ctx.trace$.parentSpanId = ctx.trace$.spanId || prevCtx.trace$.spanId | ||
ctx.trace$.traceId = ctx.trace$.traceId || prevCtx.trace$.traceId || Util.randomId() | ||
ctx.trace$.spanId = Util.randomId() | ||
ctx.trace$.timestamp = currentTime | ||
ctx.trace$.service = pattern.topic | ||
ctx.trace$.method = Util.pattern(pattern) | ||
context.trace$ = pattern.trace$ || {} | ||
context.trace$.parentSpanId = context.trace$.spanId || prevCtx.trace$.spanId | ||
context.trace$.traceId = | ||
context.trace$.traceId || prevCtx.trace$.traceId || Util.randomId() | ||
context.trace$.spanId = Util.randomId() | ||
context.trace$.timestamp = currentTime | ||
context.trace$.service = pattern.topic | ||
context.trace$.method = Util.pattern(pattern) | ||
// detect recursion | ||
if (ctx._config.maxRecursion > 1) { | ||
const callSignature = `${ctx.trace$.traceId}:${ctx.trace$.method}` | ||
if (ctx.meta$ && ctx.meta$.referrers) { | ||
var count = ctx.meta$.referrers[callSignature] | ||
if (context._config.maxRecursion > 1) { | ||
const callSignature = `${context.trace$.traceId}:${context.trace$.method}` | ||
if (context.meta$ && context.meta$.referrers) { | ||
var count = context.meta$.referrers[callSignature] | ||
count += 1 | ||
ctx.meta$.referrers[callSignature] = count | ||
if (count > ctx._config.maxRecursion) { | ||
ctx.meta$.referrers = null | ||
return next(new Errors.MaxRecursionError({ | ||
count: --count | ||
})) | ||
context.meta$.referrers[callSignature] = count | ||
if (count > context._config.maxRecursion) { | ||
context.meta$.referrers = null | ||
return next( | ||
new Errors.MaxRecursionError({ | ||
count: --count | ||
}) | ||
) | ||
} | ||
} else { | ||
ctx.meta$.referrers = {} | ||
ctx.meta$.referrers[callSignature] = 1 | ||
context.meta$.referrers = {} | ||
context.meta$.referrers[callSignature] = 1 | ||
} | ||
@@ -71,7 +73,10 @@ } | ||
id: pattern.requestId$ || Util.randomId(), | ||
parentId: ctx.request$.id || pattern.requestParentId$, | ||
type: pattern.pubsub$ === true ? Constants.REQUEST_TYPE_PUBSUB : Constants.REQUEST_TYPE_REQUEST | ||
parentId: context.request$.id || pattern.requestParentId$, | ||
type: | ||
pattern.pubsub$ === true | ||
? Constants.REQUEST_TYPE_PUBSUB | ||
: Constants.REQUEST_TYPE_REQUEST | ||
} | ||
ctx.emit('clientPreRequest', ctx) | ||
context.emit('clientPreRequest', context) | ||
@@ -81,12 +86,12 @@ // build msg | ||
pattern: cleanPattern, | ||
meta: ctx.meta$, | ||
delegate: ctx.delegate$, | ||
trace: ctx.trace$, | ||
meta: context.meta$, | ||
delegate: context.delegate$, | ||
trace: context.trace$, | ||
request: request | ||
} | ||
ctx._message = message | ||
context._message = message | ||
ctx.log.debug({ | ||
outbound: ctx | ||
context.log.debug({ | ||
outbound: context | ||
}) | ||
@@ -101,53 +106,26 @@ | ||
* @param {any} next | ||
* @returns | ||
*/ | ||
function onClientPreRequestCircuitBreaker (ctx, next) { | ||
if (ctx._config.circuitBreaker.enabled) { | ||
// any pattern represent an own circuit breaker | ||
const circuitBreaker = ctx._circuitBreakerMap.get(ctx.trace$.method) | ||
if (!circuitBreaker) { | ||
const cb = new CircuitBreaker(ctx._config.circuitBreaker) | ||
ctx._circuitBreakerMap.set(ctx.trace$.method, cb) | ||
} else { | ||
if (!circuitBreaker.available()) { | ||
// trigger half-open timer | ||
circuitBreaker.record() | ||
return next(new Errors.CircuitBreakerError(`Circuit breaker is ${circuitBreaker.state}`, { state: circuitBreaker.state, method: ctx.trace$.method, service: ctx.trace$.service })) | ||
} | ||
} | ||
function onClientPostRequest(context, next) { | ||
let pattern = context._pattern | ||
let msg = context._response.payload | ||
next() | ||
} else { | ||
next() | ||
} | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} next | ||
*/ | ||
function onClientPostRequest (ctx, next) { | ||
let pattern = ctx._pattern | ||
let msg = ctx._response.payload | ||
// pass to act context | ||
if (msg) { | ||
ctx.request$ = msg.request || {} | ||
ctx.trace$ = msg.trace || {} | ||
ctx.meta$ = msg.meta || {} | ||
context.request$ = msg.request || {} | ||
context.trace$ = msg.trace || {} | ||
context.meta$ = msg.meta || {} | ||
} | ||
// calculate request duration | ||
let diff = Util.nowHrTime() - ctx.trace$.timestamp | ||
ctx.trace$.duration = diff | ||
let diff = Util.nowHrTime() - context.trace$.timestamp | ||
context.trace$.duration = diff | ||
ctx.request$.service = pattern.topic | ||
ctx.request$.method = ctx.trace$.method | ||
context.request$.service = pattern.topic | ||
context.request$.method = context.trace$.method | ||
ctx.log.debug({ | ||
inbound: ctx | ||
context.log.debug({ | ||
inbound: context | ||
}) | ||
ctx.emit('clientPostRequest', ctx) | ||
context.emit('clientPostRequest', context) | ||
@@ -165,7 +143,8 @@ next() | ||
*/ | ||
function onServerPreRequest (ctx, req, res, next) { | ||
let m = ctx._decoderPipeline.run(ctx._request.payload, ctx) | ||
function onServerPreRequest(context, req, res, next) { | ||
let m = context._decoderPipeline.run(context._request.payload, context) | ||
if (m.error) { | ||
return res.send(m.error) | ||
next(m.error) | ||
return | ||
} | ||
@@ -176,18 +155,18 @@ | ||
if (msg) { | ||
ctx.meta$ = msg.meta || {} | ||
ctx.trace$ = msg.trace || {} | ||
ctx.delegate$ = msg.delegate || {} | ||
ctx.request$ = msg.request || {} | ||
ctx.auth$ = {} | ||
context.meta$ = msg.meta || {} | ||
context.trace$ = msg.trace || {} | ||
context.delegate$ = msg.delegate || {} | ||
context.request$ = msg.request || {} | ||
context.auth$ = {} | ||
} | ||
ctx._request.payload = m.value | ||
ctx._request.error = m.error | ||
context._request.payload = m.value | ||
context._request.error = m.error | ||
// icnoming pattern | ||
ctx._pattern = ctx._request.payload.pattern | ||
context._pattern = context._request.payload.pattern | ||
// find matched route | ||
ctx._actMeta = ctx._router.lookup(ctx._pattern) | ||
context._actMeta = context._router.lookup(context._pattern) | ||
ctx.emit('serverPreRequest', ctx) | ||
context.emit('serverPreRequest', context) | ||
@@ -205,7 +184,7 @@ next() | ||
*/ | ||
function onServerPreRequestLoadTest (ctx, req, res, next) { | ||
if (ctx._config.load.checkPolicy) { | ||
const error = ctx._loadPolicy.check() | ||
function onServerPreRequestLoadTest(context, req, res, next) { | ||
if (context._config.load.checkPolicy) { | ||
const error = context._loadPolicy.check() | ||
if (error) { | ||
ctx._shouldCrash = ctx._config.load.shouldCrash | ||
context._shouldCrash = context._config.load.shouldCrash | ||
return next(new Errors.ProcessLoadError(error.message, error.data)) | ||
@@ -225,4 +204,4 @@ } | ||
*/ | ||
function onServerPreHandler (ctx, req, res, next) { | ||
ctx.emit('serverPreHandler', ctx) | ||
function onServerPreHandler(context, req, res, next) { | ||
context.emit('serverPreHandler', context) | ||
@@ -232,4 +211,4 @@ next() | ||
function onServerPreResponse (ctx, req, res, next) { | ||
ctx.emit('serverPreResponse', ctx) | ||
function onServerPreResponse(context, req, res, next) { | ||
context.emit('serverPreResponse', context) | ||
@@ -239,6 +218,9 @@ next() | ||
module.exports.onClientPreRequest = [onClientPreRequest, onClientPreRequestCircuitBreaker] | ||
module.exports.onClientPreRequest = [onClientPreRequest] | ||
module.exports.onClientPostRequest = [onClientPostRequest] | ||
module.exports.onServerPreRequest = [onServerPreRequest, onServerPreRequestLoadTest] | ||
module.exports.onServerPreRequest = [ | ||
onServerPreRequest, | ||
onServerPreRequestLoadTest | ||
] | ||
module.exports.onServerPreHandler = [onServerPreHandler] | ||
module.exports.onServerPreResponse = [onServerPreResponse] |
1083
lib/index.js
@@ -19,3 +19,2 @@ 'use strict' | ||
const Errio = require('errio') | ||
const Hoek = require('hoek') | ||
const Heavy = require('heavy') | ||
@@ -26,10 +25,9 @@ const _ = require('lodash') | ||
const SuperError = require('super-error') | ||
const Co = require('co') | ||
const Hoek = require('hoek') | ||
const Joi = require('joi') | ||
const Avvio = require('avvio') | ||
const Series = require('fastseries') | ||
const GracefulShutdown = require('./gracefulShutdown') | ||
const Errors = require('./errors') | ||
const Constants = require('./constants') | ||
const Extension = require('./extension') | ||
const ServerExtension = require('./serverExtension') | ||
const Util = require('./util') | ||
@@ -47,13 +45,7 @@ const NatsTransport = require('./transport') | ||
const CodecPipeline = require('./codecPipeline') | ||
const Reply = require('./reply') | ||
const Add = require('./add') | ||
const Plugin = require('./plugin') | ||
const Extension = require('./extension') | ||
const pDefer = require('p-defer') | ||
// Extension finish handler | ||
const onServerPreResponse = require('./onServerPreResponse') | ||
const onServerPreRequest = require('./onServerPreRequest') | ||
const onClientTimeoutPostRequest = require('./onClientTimeoutPostRequest') | ||
const onPreRequest = require('./onPreRequest') | ||
const onClientPostRequest = require('./onClientPostRequest') | ||
const onClose = require('./onClose') | ||
/** | ||
@@ -71,3 +63,3 @@ * @class Hemera | ||
*/ | ||
constructor (transport, params) { | ||
constructor(transport, params) { | ||
super() | ||
@@ -87,3 +79,7 @@ | ||
this._topics = {} | ||
this._exposition = {} | ||
this.plugin$ = { | ||
options: { | ||
name: 'core' | ||
} | ||
} | ||
@@ -95,8 +91,2 @@ // special variables for the new execution context | ||
this.auth$ = {} | ||
this.plugin$ = new Plugin({ | ||
options: {}, | ||
attributes: { | ||
name: 'core' | ||
} | ||
}) | ||
this.trace$ = {} | ||
@@ -120,26 +110,12 @@ this.request$ = { | ||
this._cleanPattern = '' | ||
this._pluginRegistrations = [] | ||
this._decorations = {} | ||
// create reference to root hemera instance | ||
this._root = this | ||
// contains the list of all registered plugins | ||
// the core is also a plugin | ||
this._plugins = { | ||
core: this.plugin$ | ||
core: this | ||
} | ||
// keep reference to root hemera instance | ||
this._root = this | ||
this._encoderPipeline = new CodecPipeline().add(DefaultEncoder.encode) | ||
this._decoderPipeline = new CodecPipeline().add(DefaultDecoder.decode) | ||
// define extension points | ||
this._extensions = { | ||
onClientPreRequest: new Extension('onClientPreRequest'), | ||
onClientPostRequest: new Extension('onClientPostRequest'), | ||
onServerPreHandler: new ServerExtension('onServerPreHandler'), | ||
onServerPreRequest: new ServerExtension('onServerPreRequest'), | ||
onServerPreResponse: new ServerExtension('onServerPreResponse'), | ||
onClose: new Extension('onClose') | ||
} | ||
// errio settings | ||
@@ -160,13 +136,53 @@ Errio.setDefaults(this._config.errio) | ||
// will be executed before the client request is executed. | ||
this._extensions.onClientPreRequest.add(DefaultExtensions.onClientPreRequest) | ||
// will be executed after the client has received and decoded the request | ||
this._extensions.onClientPostRequest.add(DefaultExtensions.onClientPostRequest) | ||
// will be executed before the server has received the requests | ||
this._extensions.onServerPreRequest.add(DefaultExtensions.onServerPreRequest) | ||
// will be executed before the server action is executed | ||
this._extensions.onServerPreHandler.add(DefaultExtensions.onServerPreHandler) | ||
// will be executed before the server has replied the response and build the message | ||
this._extensions.onServerPreResponse.add(DefaultExtensions.onServerPreResponse) | ||
this._ext = new Extension() | ||
this._ext.add('onClientPreRequest', DefaultExtensions.onClientPreRequest) | ||
this._ext.add('onClientPostRequest', DefaultExtensions.onClientPostRequest) | ||
this._ext.add('onServerPreRequest', DefaultExtensions.onServerPreRequest) | ||
this._ext.add('onServerPreHandler', DefaultExtensions.onServerPreHandler) | ||
this._ext.add('onServerPreResponse', DefaultExtensions.onServerPreResponse) | ||
this._avvio = Avvio(this, { | ||
expose: { | ||
use: 'register', | ||
close: 'shutdown', | ||
onClose: 'onShutdown', | ||
ready: 'bootstrap' | ||
} | ||
}) | ||
this._avvio.override = (hemera, plugin, opts) => { | ||
const res = Object.create(hemera) | ||
const proto = Object.getPrototypeOf(res) | ||
if (hemera._config.childLogger) { | ||
res.log = hemera.log.child({ plugin: opts.name }) | ||
} | ||
// plugin seperated prototype propertys | ||
res.plugin$ = { | ||
options: opts, | ||
name: opts.name | ||
} | ||
// extend prototype so that each nested plugin have access | ||
res.decorate = function decorate(prop, value, deps) { | ||
if (prop in this) { | ||
throw new Errors.HemeraError(Constants.DECORATION_ALREADY_DEFINED) | ||
} | ||
if (deps) { | ||
res._checkDecoraterDependencies(deps) | ||
} | ||
// extend prototype | ||
proto[prop] = value | ||
} | ||
this._plugins[opts.name] = res | ||
return res | ||
} | ||
this._series = Series() | ||
// use own logger | ||
@@ -178,8 +194,11 @@ if (this._config.logger) { | ||
let pretty = Pino.pretty() | ||
this.log = Pino({ | ||
name: this._config.name, | ||
safe: true, // avoid error caused by circular references | ||
level: this._config.logLevel, | ||
serializers: Serializers | ||
}, pretty) | ||
this.log = Pino( | ||
{ | ||
name: this._config.name, | ||
safe: true, // avoid error caused by circular references | ||
level: this._config.logLevel, | ||
serializers: Serializers | ||
}, | ||
pretty | ||
) | ||
@@ -199,6 +218,2 @@ // Leads to too much listeners in tests | ||
} | ||
this._gracefulShutdown = new GracefulShutdown(this.log) | ||
this._gracefulShutdown.addHandler((code, cb) => this.close(cb)) | ||
this._gracefulShutdown.init() | ||
} | ||
@@ -211,3 +226,3 @@ | ||
*/ | ||
_registerErrors () { | ||
_registerErrors() { | ||
for (var error in Hemera.errors) { | ||
@@ -223,3 +238,3 @@ Errio.register(Hemera.errors[error]) | ||
*/ | ||
get decoder () { | ||
get decoder() { | ||
return this._decoderPipeline | ||
@@ -234,3 +249,3 @@ } | ||
*/ | ||
get encoder () { | ||
get encoder() { | ||
return this._encoderPipeline | ||
@@ -240,13 +255,2 @@ } | ||
/** | ||
* Return all registered plugins | ||
* | ||
* @readonly | ||
* | ||
* @memberOf Hemera | ||
*/ | ||
get plugins () { | ||
return this._plugins | ||
} | ||
/** | ||
* Return the bloomrun instance | ||
@@ -258,3 +262,3 @@ * | ||
*/ | ||
get router () { | ||
get router() { | ||
return this._router | ||
@@ -270,3 +274,3 @@ } | ||
*/ | ||
get load () { | ||
get load() { | ||
return this._heavy.load | ||
@@ -276,13 +280,2 @@ } | ||
/** | ||
* Return the shared object of all exposed data | ||
* | ||
* @readonly | ||
* @type {Exposition} | ||
* @memberOf Hemera | ||
*/ | ||
get exposition () { | ||
return this._exposition | ||
} | ||
/** | ||
* Return the underlying NATS driver | ||
@@ -294,3 +287,3 @@ * | ||
*/ | ||
get transport () { | ||
get transport() { | ||
return this._transport.driver | ||
@@ -306,3 +299,3 @@ } | ||
*/ | ||
get topics () { | ||
get topics() { | ||
return this._topics | ||
@@ -318,3 +311,3 @@ } | ||
*/ | ||
get config () { | ||
get config() { | ||
return this._config | ||
@@ -330,3 +323,3 @@ } | ||
*/ | ||
get errorDetails () { | ||
get errorDetails() { | ||
if (this._isServer) { | ||
@@ -355,3 +348,3 @@ return { | ||
*/ | ||
static get errors () { | ||
static get errors() { | ||
return Errors | ||
@@ -367,3 +360,3 @@ } | ||
*/ | ||
static createError (name) { | ||
static createError(name) { | ||
const ctor = SuperError.subclass(name) | ||
@@ -376,22 +369,2 @@ // Register the class with Errio. | ||
/** | ||
* Exposed data in context of the current plugin | ||
* It is accessible by this.expositions[<plugin>][<key>] | ||
* | ||
* @param {string} key | ||
* @param {mixed} object | ||
* | ||
* @memberOf Hemera | ||
*/ | ||
expose (key, object) { | ||
let pluginName = this.plugin$.attributes.name | ||
if (!this._exposition[pluginName]) { | ||
this._exposition[pluginName] = {} | ||
this._exposition[pluginName][key] = object | ||
} else { | ||
this._exposition[pluginName][key] = object | ||
} | ||
} | ||
/** | ||
* Add an extension. Extensions are called in serie | ||
@@ -404,71 +377,18 @@ * | ||
*/ | ||
ext (type, handler) { | ||
if (!this._extensions[type]) { | ||
let error = new Errors.HemeraError(Constants.INVALID_EXTENSION_TYPE, { | ||
type | ||
}) | ||
this.log.error(error) | ||
throw error | ||
ext(type, handler) { | ||
if (type === 'onClose') { | ||
this.onShutdown(handler) | ||
} else { | ||
this._ext.add(type, handler) | ||
} | ||
this._extensions[type].add(handler) | ||
} | ||
/** | ||
* Use a plugin. | ||
* | ||
* @param {any} plugin | ||
* | ||
* @memberOf Hemera | ||
* @readonly | ||
* @memberof Hemera | ||
*/ | ||
use (params, options) { | ||
// use plugin infos from package.json | ||
if (_.isObject(params.attributes.pkg)) { | ||
params.attributes = params.attributes || {} | ||
params.attributes = Hoek.applyToDefaults(params.attributes, _.pick(params.attributes.pkg, ['name', 'description', 'version'])) | ||
} | ||
let pluginOptions = {} | ||
// pass options as second argument during plugin registration | ||
if (_.isObject(options)) { | ||
pluginOptions = Hoek.clone(params.options) || {} | ||
pluginOptions = Hoek.applyToDefaults(pluginOptions, options) | ||
} else if (params.options) { | ||
pluginOptions = Hoek.clone(params.options) | ||
} | ||
// plugin name is required | ||
if (!params.attributes.name) { | ||
let error = new Errors.HemeraError(Constants.PLUGIN_NAME_REQUIRED) | ||
this.log.error(error) | ||
this.emit('error', error) | ||
return | ||
} | ||
// create new execution context | ||
let ctx = this.createContext() | ||
const plugin = new Plugin({ | ||
register: params.plugin.bind(ctx), | ||
attributes: params.attributes, | ||
parentPluginName: this.plugin$.attributes.name, | ||
options: pluginOptions | ||
}) | ||
ctx.plugin$ = plugin | ||
if (ctx._config.childLogger) { | ||
ctx.log = this.log.child({ plugin: plugin.attributes.name }) | ||
} | ||
ctx.use = () => { | ||
ctx.log.error(Constants.NO_USE_IN_PLUGINS) | ||
throw new Errors.HemeraError(Constants.NO_USE_IN_PLUGINS) | ||
} | ||
this._pluginRegistrations.push(plugin) | ||
this.log.info(params.attributes.name, Constants.PLUGIN_ADDED) | ||
this._plugins[params.attributes.name] = plugin | ||
get plugins() { | ||
return this._plugins | ||
} | ||
@@ -484,3 +404,3 @@ | ||
*/ | ||
setOption (key, value) { | ||
setOption(key, value) { | ||
this.plugin$.options[key] = value | ||
@@ -495,3 +415,3 @@ } | ||
*/ | ||
setConfig (key, value) { | ||
setConfig(key, value) { | ||
this._config[key] = value | ||
@@ -501,2 +421,12 @@ } | ||
/** | ||
* Return the root instance | ||
* | ||
* @returns | ||
* @memberof Hemera | ||
*/ | ||
root() { | ||
return this._root | ||
} | ||
/** | ||
* Exit the process | ||
@@ -506,9 +436,20 @@ * | ||
*/ | ||
fatal () { | ||
this._gracefulShutdown.shutdown('fatal') | ||
fatal() { | ||
process.exit(1) | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} prop | ||
* @returns | ||
* @memberof Hemera | ||
*/ | ||
hasDecorator(prop) { | ||
return prop in this | ||
} | ||
/** | ||
* Decorate the root instance | ||
* Value is globaly accesible | ||
* Value is globaly accessible | ||
* | ||
@@ -520,30 +461,26 @@ * @param {any} prop | ||
*/ | ||
decorate (prop, value) { | ||
if (this._decorations[prop]) { | ||
throw new Error(Constants.DECORATION_ALREADY_DEFINED) | ||
} else if (this[prop]) { | ||
throw new Error(Constants.OVERRIDE_BUILTIN_METHOD_NOT_ALLOWED) | ||
decorate(prop, value, deps) { | ||
if (this[prop]) { | ||
throw new Errors.HemeraError(Constants.DECORATION_ALREADY_DEFINED) | ||
} | ||
this._decorations[prop] = { | ||
plugin: this.plugin$, | ||
value | ||
if (deps) { | ||
this._checkDecoraterDependencies(deps) | ||
} | ||
// decorate root hemera instance | ||
this._root[prop] = value | ||
this[prop] = value | ||
} | ||
/** | ||
* Create a custom super error object in a running hemera instance | ||
* | ||
* @param {any} name | ||
* @returns | ||
* | ||
* @memberOf Hemera | ||
* @param {any} deps | ||
* @memberof Hemera | ||
*/ | ||
createError (name) { | ||
const ctor = SuperError.subclass(name) | ||
// Register the class with Errio. | ||
Errio.register(ctor) | ||
return ctor | ||
_checkDecoraterDependencies(deps) { | ||
for (var i = 0; i < deps.length; i++) { | ||
if (!(deps in this)) { | ||
throw new Error(Constants.MISSING_DECORATE_DEPENDENCY) | ||
} | ||
} | ||
} | ||
@@ -554,45 +491,61 @@ | ||
* | ||
* @param {any} plugins | ||
* @returns | ||
* @memberof Hemera | ||
*/ | ||
_use(plugin, opts, cb) { | ||
const pluginOpts = Hoek.clone(plugin.options || {}) | ||
const options = Hoek.applyToDefaults(pluginOpts, opts || {}, true) | ||
if (!options.name) { | ||
throw new Errors.HemeraError(Constants.PLUGIN_NAME_REQUIRED) | ||
} | ||
this.register(plugin.plugin, options, cb) | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} plugin | ||
* @param {any} opts | ||
* @param {any} cb | ||
* @returns | ||
* @memberof Hemera | ||
*/ | ||
registerPlugins (cb) { | ||
const each = (item, next) => { | ||
// plugin has no callback | ||
if (item.register.length < 2) { | ||
item.register(item.options) | ||
return next() | ||
} | ||
use(plugin, opts, cb) { | ||
// when opts is omitted | ||
if (_.isFunction(opts)) { | ||
cb = opts | ||
opts = {} | ||
} | ||
// Detect plugin timeouts | ||
const pluginTimer = setTimeout(() => { | ||
const internalError = new Errors.PluginTimeoutError(Constants.PLUGIN_TIMEOUT_ERROR) | ||
this.log.error(internalError, `Plugin: ${item.attributes.name}`) | ||
next(internalError) | ||
}, this._config.pluginTimeout) | ||
item.register(item.options, (err) => { | ||
clearTimeout(pluginTimer) | ||
next(err) | ||
if (_.isArray(plugin)) { | ||
plugin.forEach(p => { | ||
this._use(p, opts, cb) | ||
}) | ||
} else { | ||
this._use(plugin, opts, cb) | ||
} | ||
// register all plugins in serie | ||
Util.serial(this._pluginRegistrations, each, (err) => { | ||
if (err) { | ||
if (err instanceof SuperError) { | ||
err = err.rootCause || err.cause || err | ||
} | ||
const internalError = new Errors.HemeraError(Constants.PLUGIN_REGISTRATION_ERROR).causedBy(err) | ||
this.log.error(internalError) | ||
this.emit('error', internalError) | ||
} else if (_.isFunction(cb)) { | ||
cb.call(this) | ||
} | ||
}) | ||
return this._avvio | ||
} | ||
/** | ||
* Create a custom super error object in a running hemera instance | ||
* | ||
* @param {any} name | ||
* @returns | ||
* | ||
* @memberOf Hemera | ||
*/ | ||
createError(name) { | ||
const ctor = SuperError.subclass(name) | ||
// Register the class with Errio. | ||
Errio.register(ctor) | ||
return ctor | ||
} | ||
/** | ||
* | ||
* | ||
* @param {Function} cb | ||
@@ -602,6 +555,6 @@ * | ||
*/ | ||
ready (cb) { | ||
this._transport.driver.on('error', (err) => { | ||
ready(cb) { | ||
this._transport.driver.on('error', err => { | ||
this.log.error(err, Constants.NATS_TRANSPORT_ERROR) | ||
this.log.error('NATS Code: \'%s\', Message: %s', err.code, err.message) | ||
this.log.error("NATS Code: '%s', Message: %s", err.code, err.message) | ||
@@ -616,3 +569,3 @@ // Exit only on connection issues. | ||
this._transport.driver.on('permission_error', (err) => { | ||
this._transport.driver.on('permission_error', err => { | ||
this.log.error(err, Constants.NATS_PERMISSION_ERROR) | ||
@@ -639,3 +592,7 @@ }) | ||
this.log.info(Constants.NATS_TRANSPORT_CONNECTED) | ||
this.registerPlugins(cb) | ||
this.bootstrap(err => { | ||
if (_.isFunction(cb)) { | ||
cb(err) | ||
} | ||
}) | ||
}) | ||
@@ -645,49 +602,111 @@ } | ||
/** | ||
* Build the final payload for the response | ||
* Last step before the response is send to the callee. | ||
* The preResponse extension is dispatched and previous errors are evaluated. | ||
* | ||
* | ||
* @memberOf Hemera | ||
*/ | ||
_buildMessage () { | ||
let result = this._response | ||
finish() { | ||
const self = this | ||
const args = [self, self._request, self._reply] | ||
self._series(self, self._ext['onServerPreResponse'], args, err => | ||
self._onServerPreResponseCompleted(err) | ||
) | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} extensionError | ||
* @memberof Hemera | ||
*/ | ||
_onServerPreResponseCompleted(extensionError) { | ||
const self = this | ||
// check if any error was set before | ||
if (extensionError) { | ||
self._reply.error = extensionError | ||
const internalError = new Errors.HemeraError( | ||
Constants.EXTENSION_ERROR, | ||
self.errorDetails | ||
).causedBy(extensionError) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', self._reply.error) | ||
} | ||
const result = self._createMessage() | ||
self._send(result) | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} msg | ||
* @memberof Hemera | ||
*/ | ||
_send(msg) { | ||
const self = this | ||
// indicates that an error occurs and that the program should exit | ||
if (self._shouldCrash) { | ||
// only when we have an inbox othwerwise exit the service immediately | ||
if (self._replyTo) { | ||
// send error back to callee | ||
self._transport.send(self._replyTo, msg, () => { | ||
// let it crash | ||
if (self._config.crashOnFatal) { | ||
self.fatal() | ||
} | ||
}) | ||
return | ||
} else if (self._config.crashOnFatal) { | ||
self.fatal() | ||
return | ||
} | ||
} | ||
// reply only when we have an inbox | ||
if (self._replyTo) { | ||
self._transport.send(self._replyTo, msg) | ||
} | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} err | ||
* @returns | ||
* @memberof Hemera | ||
*/ | ||
_createMessage() { | ||
const self = this | ||
let message = { | ||
meta: this.meta$ || {}, | ||
trace: this.trace$ || {}, | ||
request: this.request$, | ||
result: result.error ? null : result.payload, | ||
error: result.error ? Errio.toObject(result.error) : null | ||
meta: self.meta$ || {}, | ||
trace: self.trace$ || {}, | ||
request: self.request$, | ||
result: self._reply.error ? null : self._reply.payload, | ||
error: self._reply.error ? Errio.toObject(self._reply.error) : null | ||
} | ||
let m = this._encoderPipeline.run(message, this) | ||
let m = self._encoderPipeline.run(message, self) | ||
// attach encoding issues | ||
if (m.error) { | ||
let internalError = new Errors.ParseError(Constants.PAYLOAD_PARSING_ERROR).causedBy(m.error) | ||
let internalError = new Errors.ParseError( | ||
Constants.PAYLOAD_PARSING_ERROR | ||
).causedBy(m.error) | ||
message.error = Errio.toObject(internalError) | ||
message.result = null | ||
// Retry to encode with issue perhaps the reason was data related | ||
m = this._encoderPipeline.run(message, this) | ||
this.log.error(internalError) | ||
this.emit('serverResponseError', m.error) | ||
m = self._encoderPipeline.run(message, self) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', m.error) | ||
} | ||
// final response | ||
this._message = m.value | ||
return m.value | ||
} | ||
/** | ||
* Last step before the response is send to the callee. | ||
* The preResponse extension is dispatched and previous errors are evaluated. | ||
* | ||
* @memberOf Hemera | ||
*/ | ||
finish () { | ||
this._extensions.onServerPreResponse | ||
.dispatch(this, (err, val) => onServerPreResponse(this, err, val)) | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} err | ||
@@ -699,3 +718,3 @@ * @param {any} resp | ||
*/ | ||
_actionHandler (err, resp) { | ||
_actionHandler(err, resp) { | ||
const self = this | ||
@@ -718,15 +737,12 @@ | ||
if (err instanceof SuperError) { | ||
self._response.error = err.rootCause || err.cause || err | ||
} else { | ||
self._response.error = err | ||
} | ||
self._reply.error = self.getRootError(err) | ||
return self.finish() | ||
self.finish() | ||
return | ||
} | ||
// assign action result | ||
self._response.payload = resp | ||
// delete error we have payload | ||
self._response.error = null | ||
// set only when we have a result and the reply interface wasn't called | ||
if (resp && !self._reply.sent) { | ||
self._reply.payload = resp | ||
} | ||
@@ -747,3 +763,3 @@ self.finish() | ||
*/ | ||
subscribe (topic, subToMany, maxMessages, queue) { | ||
subscribe(topic, subToMany, maxMessages, queue) { | ||
const self = this | ||
@@ -759,14 +775,16 @@ | ||
// create new execution context | ||
let ctx = this.createContext() | ||
ctx._shouldCrash = false | ||
ctx._replyTo = replyTo | ||
ctx._topic = topic | ||
ctx._request = new ServerRequest(request) | ||
ctx._response = new ServerResponse() | ||
ctx._pattern = {} | ||
ctx._actMeta = {} | ||
ctx._isServer = true | ||
let hemera = self.createContext() | ||
hemera._topic = topic | ||
hemera._replyTo = replyTo | ||
hemera._request = new ServerRequest(request) | ||
hemera._response = new ServerResponse() | ||
hemera._reply = new Reply(hemera._request, hemera._response, hemera.log) | ||
hemera._pattern = {} | ||
hemera._actMeta = {} | ||
hemera._isServer = true | ||
ctx._extensions.onServerPreRequest | ||
.dispatch(ctx, (err, val) => onServerPreRequest(ctx, err, val)) | ||
const args = [hemera, hemera._request, hemera._reply] | ||
hemera._series(hemera, self._ext['onServerPreRequest'], args, err => | ||
hemera._onServerPreRequestCompleted(err) | ||
) | ||
} | ||
@@ -776,12 +794,20 @@ | ||
if (subToMany) { | ||
self._topics[topic] = self._transport.subscribe(topic, { | ||
max: maxMessages | ||
}, handler) | ||
self._topics[topic] = self._transport.subscribe( | ||
topic, | ||
{ | ||
max: maxMessages | ||
}, | ||
handler | ||
) | ||
} else { | ||
const queueGroup = queue || `${Constants.NATS_QUEUEGROUP_PREFIX}.${topic}` | ||
// queue group names allow load balancing of services | ||
self._topics[topic] = self._transport.subscribe(topic, { | ||
'queue': queueGroup, | ||
max: maxMessages | ||
}, handler) | ||
self._topics[topic] = self._transport.subscribe( | ||
topic, | ||
{ | ||
queue: queueGroup, | ||
max: maxMessages | ||
}, | ||
handler | ||
) | ||
} | ||
@@ -791,2 +817,107 @@ } | ||
/** | ||
* | ||
* @param {any} extensionError | ||
* @param {any} value | ||
* @memberof Hemera | ||
*/ | ||
_onServerPreRequestCompleted(extensionError) { | ||
const self = this | ||
// check if any error was set before | ||
if (extensionError) { | ||
self._reply.error = extensionError | ||
self.finish() | ||
return | ||
} | ||
// check if a handler is registered with this pattern | ||
if (self._actMeta) { | ||
const args = [self, self._request, self._reply] | ||
self._series(self, self._ext['onServerPreHandler'], args, err => | ||
self._onServerPreHandlerCompleted(err) | ||
) | ||
} else { | ||
const internalError = new Errors.PatternNotFound( | ||
Constants.PATTERN_NOT_FOUND, | ||
self.errorDetails | ||
) | ||
self.log.error(internalError) | ||
self._reply.error = internalError | ||
self.emit('serverResponseError', self._reply.error) | ||
// send error back to callee | ||
self.finish() | ||
} | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} extensionError | ||
* @memberof Hemera | ||
*/ | ||
_onServerPreHandlerCompleted(extensionError) { | ||
const self = this | ||
// check if any error was set before | ||
if (extensionError) { | ||
self._reply.error = extensionError | ||
const internalError = new Errors.HemeraError( | ||
Constants.EXTENSION_ERROR, | ||
self.errorDetails | ||
).causedBy(extensionError) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', self._reply.error) | ||
self.finish() | ||
return | ||
} | ||
try { | ||
let action = self._actMeta.action.bind(self) | ||
self._actMeta.run(self._request, self._reply, err => { | ||
if (err) { | ||
self._reply.error = err | ||
const internalError = new Errors.HemeraError( | ||
Constants.ADD_MIDDLEWARE_ERROR, | ||
self.errorDetails | ||
).causedBy(err) | ||
self.log.error(internalError) | ||
self.emit('serverResponseError', self._reply.error) | ||
self.finish() | ||
return | ||
} | ||
// if request type is 'pubsub' we dont have to reply back | ||
if ( | ||
self._request.payload.request.type === Constants.REQUEST_TYPE_PUBSUB | ||
) { | ||
action(self._request.payload.pattern) | ||
self.finish() | ||
return | ||
} | ||
// execute RPC action | ||
if (self._actMeta.isPromisable) { | ||
action(self._request.payload.pattern) | ||
.then(x => self._actionHandler(null, x)) | ||
.catch(e => self._actionHandler(e)) | ||
} else { | ||
action(self._request.payload.pattern, (err, result) => | ||
self._actionHandler(err, result) | ||
) | ||
} | ||
}) | ||
} catch (err) { | ||
self._reply.error = self.getRootError(err) | ||
// service should exit | ||
self._shouldCrash = true | ||
self.finish() | ||
} | ||
} | ||
/** | ||
* Unsubscribe a topic or subscription id from NATS and Hemera | ||
@@ -800,7 +931,9 @@ * | ||
*/ | ||
remove (topic, maxMessages) { | ||
remove(topic, maxMessages) { | ||
const self = this | ||
if (!topic) { | ||
let error = new Errors.HemeraError(Constants.TOPIC_SID_REQUIRED_FOR_DELETION) | ||
let error = new Errors.HemeraError( | ||
Constants.TOPIC_SID_REQUIRED_FOR_DELETION | ||
) | ||
self.log.error(error) | ||
@@ -832,3 +965,3 @@ throw error | ||
*/ | ||
cleanTopic (topic) { | ||
cleanTopic(topic) { | ||
// release topic so we can add it again | ||
@@ -852,3 +985,3 @@ delete this._topics[topic] | ||
*/ | ||
add (pattern, cb) { | ||
add(pattern, cb) { | ||
// check for use quick syntax for JSON objects | ||
@@ -888,3 +1021,2 @@ if (_.isString(pattern)) { | ||
// cb is null when we use chaining syntax | ||
if (cb) { | ||
@@ -908,5 +1040,8 @@ // set callback | ||
this.log.error({ | ||
pattern | ||
}, error) | ||
this.log.error( | ||
{ | ||
pattern | ||
}, | ||
error | ||
) | ||
this.emit('error', error) | ||
@@ -925,3 +1060,4 @@ } | ||
pattern.maxMessages$, | ||
pattern.queue$) | ||
pattern.queue$ | ||
) | ||
@@ -938,3 +1074,3 @@ return addDefinition | ||
*/ | ||
_sendRequestHandler (response) { | ||
_sendRequestHandler(response) { | ||
const self = this | ||
@@ -948,6 +1084,8 @@ const res = self._decoderPipeline.run(response, self) | ||
if (self._response.error) { | ||
let error = new Errors.ParseError(Constants.PAYLOAD_PARSING_ERROR, self.errorDetails).causedBy(self._response.error) | ||
let error = new Errors.ParseError( | ||
Constants.PAYLOAD_PARSING_ERROR, | ||
self.errorDetails | ||
).causedBy(self._response.error) | ||
self.log.error(error) | ||
self.emit('clientResponseError', error) | ||
self._execute(error) | ||
@@ -957,12 +1095,12 @@ return | ||
self._extensions.onClientPostRequest.dispatch(self, (err) => onClientPostRequest(self, err)) | ||
// Execute onClientPostRequest extension | ||
self._series(self, self._ext['onClientPostRequest'], [self], err => | ||
self._onClientPostRequestCompleted(err) | ||
) | ||
} catch (err) { | ||
let error = null | ||
if (err instanceof SuperError) { | ||
error = err.rootCause || err.cause || err | ||
} else { | ||
error = err | ||
} | ||
const internalError = new Errors.FatalError(Constants.FATAL_ERROR, self.errorDetails).causedBy(err) | ||
let error = self.getRootError(err) | ||
const internalError = new Errors.FatalError( | ||
Constants.FATAL_ERROR, | ||
self.errorDetails | ||
).causedBy(err) | ||
self.log.fatal(internalError) | ||
@@ -974,2 +1112,4 @@ self.emit('clientResponseError', error) | ||
self.fatal() | ||
} else { | ||
self._execute(error) | ||
} | ||
@@ -980,2 +1120,39 @@ } | ||
/** | ||
* | ||
* | ||
* @param {any} extensionError | ||
* @memberof Hemera | ||
*/ | ||
_onClientPostRequestCompleted(extensionError) { | ||
const self = this | ||
if (extensionError) { | ||
let error = self.getRootError(extensionError) | ||
const internalError = new Errors.HemeraError( | ||
Constants.EXTENSION_ERROR, | ||
self.errorDetails | ||
).causedBy(extensionError) | ||
self.log.error(internalError) | ||
self.emit('clientResponseError', error) | ||
self._execute(error) | ||
return | ||
} | ||
if (self._response.payload.error) { | ||
let error = Errio.fromObject(self._response.payload.error) | ||
const internalError = new Errors.BusinessError( | ||
Constants.BUSINESS_ERROR, | ||
self.errorDetails | ||
).causedBy(error) | ||
self.log.error(internalError) | ||
self.emit('clientResponseError', error) | ||
self._execute(error) | ||
return | ||
} | ||
self._execute(null, self._response.payload.result) | ||
} | ||
/** | ||
* Start an action. | ||
@@ -988,3 +1165,3 @@ * | ||
*/ | ||
act (pattern, cb) { | ||
act(pattern, cb) { | ||
// check for use quick syntax for JSON objects | ||
@@ -1002,16 +1179,20 @@ if (_.isString(pattern)) { | ||
// create new execution context | ||
let ctx = this.createContext() | ||
ctx._pattern = pattern | ||
ctx._prevContext = this | ||
ctx._cleanPattern = Util.cleanFromSpecialVars(pattern) | ||
ctx._response = new ClientResponse() | ||
ctx._request = new ClientRequest() | ||
ctx._isServer = false | ||
ctx._execute = null | ||
ctx._hasCallback = false | ||
ctx._isPromisable = false | ||
let hemera = this.createContext() | ||
hemera._pattern = pattern | ||
hemera._prevContext = this | ||
hemera._cleanPattern = Util.cleanFromSpecialVars(pattern) | ||
hemera._response = new ClientResponse() | ||
hemera._request = new ClientRequest() | ||
hemera._isServer = false | ||
hemera._execute = null | ||
hemera._defer = pDefer() | ||
hemera._isPromisable = false | ||
hemera._actCallback = null | ||
// topic is needed to subscribe on a subject in NATS | ||
if (!pattern.topic) { | ||
let error = new Errors.HemeraError(Constants.NO_TOPIC_TO_REQUEST, ctx.errorDetails) | ||
let error = new Errors.HemeraError( | ||
Constants.NO_TOPIC_TO_REQUEST, | ||
hemera.errorDetails | ||
) | ||
this.log.error(error) | ||
@@ -1022,57 +1203,117 @@ throw error | ||
if (cb) { | ||
ctx._hasCallback = true | ||
if (Util.isGeneratorFunction(cb)) { | ||
ctx._actCallback = Co.wrap(cb.bind(ctx)) | ||
ctx._isPromisable = true | ||
} else if (Util.isAsyncFunction(cb)) { | ||
ctx._actCallback = cb.bind(ctx) | ||
ctx._isPromisable = true | ||
if (Util.isAsyncFunction(cb)) { | ||
hemera._actCallback = cb.bind(hemera) | ||
hemera._isPromisable = true | ||
} else { | ||
ctx._actCallback = cb.bind(ctx) | ||
ctx._isPromisable = false | ||
hemera._actCallback = cb.bind(hemera) | ||
hemera._isPromisable = false | ||
} | ||
} | ||
const promise = new Promise((resolve, reject) => { | ||
ctx._execute = (err, result) => { | ||
if (ctx._config.circuitBreaker.enabled) { | ||
const circuitBreaker = ctx._circuitBreakerMap.get(ctx.trace$.method) | ||
if (err) { | ||
circuitBreaker.failure() | ||
} else { | ||
circuitBreaker.success() | ||
} | ||
} | ||
if (ctx._hasCallback) { | ||
if (ctx._isPromisable) { | ||
ctx._actCallback(err, result) | ||
.then(x => resolve(x)) | ||
.catch(x => reject(x)) | ||
} else { | ||
// any return value in a callback function will fullfilled the | ||
// promise but an error will reject it | ||
const r = ctx._actCallback(err, result) | ||
if (r instanceof Error) { | ||
reject(r) | ||
} else { | ||
resolve(r) | ||
} | ||
} | ||
hemera._execute = (err, result) => { | ||
if (hemera._isPromisable) { | ||
hemera | ||
._actCallback(err, result) | ||
.then(hemera._defer.resolve) | ||
.catch(hemera._defer.reject) | ||
} else if (hemera._actCallback) { | ||
const res = hemera._actCallback(err, result) | ||
hemera._defer.resolve(res) | ||
} else { | ||
if (err) { | ||
hemera._defer.reject(err) | ||
} else { | ||
if (err) { | ||
reject(err) | ||
} else { | ||
resolve(result) | ||
} | ||
hemera._defer.resolve(result) | ||
} | ||
} | ||
}) | ||
} | ||
ctx._extensions.onClientPreRequest.dispatch(ctx, (err) => onPreRequest(ctx, err)) | ||
// Execute onClientPreRequest extension | ||
hemera._series(hemera, hemera._ext['onClientPreRequest'], [hemera], err => | ||
hemera._onPreRequestCompleted(err) | ||
) | ||
return promise | ||
return hemera._defer.promise | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} err | ||
* @returns | ||
* @memberof Hemera | ||
*/ | ||
getRootError(err) { | ||
let error = null | ||
if (err instanceof SuperError) { | ||
error = err.rootCause || err.cause || err | ||
} else { | ||
error = err | ||
} | ||
return error | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} err | ||
* @memberof Hemera | ||
*/ | ||
_onPreRequestCompleted(err) { | ||
const self = this | ||
let m = self._encoderPipeline.run(self._message, self) | ||
// encoding issue | ||
if (m.error) { | ||
let error = new Errors.ParseError( | ||
Constants.PAYLOAD_PARSING_ERROR | ||
).causedBy(m.error) | ||
self.log.error(error) | ||
self.emit('clientResponseError', error) | ||
self._execute(error) | ||
return | ||
} | ||
if (err) { | ||
let error = self.getRootError(err) | ||
const internalError = new Errors.HemeraError( | ||
Constants.EXTENSION_ERROR | ||
).causedBy(err) | ||
self.log.error(internalError) | ||
self.emit('clientResponseError', error) | ||
self._execute(error) | ||
return | ||
} | ||
self._request.payload = m.value | ||
self._request.error = m.error | ||
// use simple publish mechanism instead of request/reply | ||
if (self._pattern.pubsub$ === true) { | ||
if (self._actCallback) { | ||
self.log.info(Constants.PUB_CALLBACK_REDUNDANT) | ||
} | ||
self._transport.send(self._pattern.topic, self._request.payload) | ||
} else { | ||
const optOptions = {} | ||
// limit on the number of responses the requestor may receive | ||
if (self._pattern.maxMessages$ > 0) { | ||
optOptions.max = self._pattern.maxMessages$ | ||
} else if (self._pattern.maxMessages$ !== -1) { | ||
optOptions.max = 1 | ||
} | ||
// send request | ||
self._sid = self._transport.sendRequest( | ||
self._pattern.topic, | ||
self._request.payload, | ||
optOptions, | ||
resp => self._sendRequestHandler(resp) | ||
) | ||
// handle timeout | ||
self.handleTimeout() | ||
} | ||
} | ||
/** | ||
* Handle the timeout when a pattern could not be resolved. Can have different reasons: | ||
@@ -1086,3 +1327,3 @@ * - No one was connected at the time (service unavailable) | ||
*/ | ||
handleTimeout () { | ||
handleTimeout() { | ||
const self = this | ||
@@ -1092,7 +1333,14 @@ const timeout = self._pattern.timeout$ || this._config.timeout | ||
let timeoutHandler = () => { | ||
const error = new Errors.TimeoutError(Constants.ACT_TIMEOUT_ERROR, self.errorDetails) | ||
const error = new Errors.TimeoutError( | ||
Constants.ACT_TIMEOUT_ERROR, | ||
self.errorDetails | ||
) | ||
self.log.error(error) | ||
self._response.error = error | ||
self.emit('clientResponseError', error) | ||
self._extensions.onClientPostRequest.dispatch(self, (err) => onClientTimeoutPostRequest(self, err)) | ||
// Execute onClientPostRequest extension | ||
self._series(self, self._ext['onClientPostRequest'], [self], err => | ||
self._onClientTimeoutPostRequestCompleted(err) | ||
) | ||
} | ||
@@ -1104,2 +1352,39 @@ | ||
/** | ||
* | ||
* | ||
* @param {any} err | ||
* @memberof Hemera | ||
*/ | ||
_onClientTimeoutPostRequestCompleted(err) { | ||
const self = this | ||
if (err) { | ||
let error = self.getRootError(err) | ||
const internalError = new Errors.HemeraError( | ||
Constants.EXTENSION_ERROR | ||
).causedBy(err) | ||
self.log.error(internalError) | ||
self._response.error = error | ||
self.emit('clientResponseError', error) | ||
} | ||
try { | ||
self._execute(self._response.error) | ||
} catch (err) { | ||
let error = self.getRootError(err) | ||
const internalError = new Errors.FatalError( | ||
Constants.FATAL_ERROR, | ||
self.errorDetails | ||
).causedBy(err) | ||
self.log.fatal(internalError) | ||
self.emit('clientResponseError', error) | ||
// let it crash | ||
if (self._config.crashOnFatal) { | ||
self.fatal() | ||
} | ||
} | ||
} | ||
/** | ||
* Create new instance of hemera but based on the current prototype | ||
@@ -1112,8 +1397,8 @@ * so we are able to create a scope per act without lossing the reference to the core api. | ||
*/ | ||
createContext () { | ||
createContext() { | ||
const self = this | ||
const ctx = Object.create(self) | ||
const hemera = Object.create(self) | ||
return ctx | ||
return hemera | ||
} | ||
@@ -1130,3 +1415,3 @@ | ||
*/ | ||
list (pattern, options) { | ||
list(pattern, options) { | ||
return this._router.list(pattern, options) | ||
@@ -1140,3 +1425,3 @@ } | ||
*/ | ||
removeAll () { | ||
removeAll() { | ||
_.each(this._topics, (val, key) => this.remove(key)) | ||
@@ -1146,10 +1431,46 @@ } | ||
/** | ||
* Gracefully shutdown of all resources. | ||
* Unsubscribe all subscriptiuons and close the underlying NATS connection | ||
* | ||
* | ||
* @memberof Hemera | ||
*/ | ||
close(cb) { | ||
this.shutdown((err, instance, done) => { | ||
instance._onClose(err, err => { | ||
done(err) | ||
cb(err) | ||
}) | ||
}) | ||
} | ||
/** | ||
* | ||
* | ||
* @param {any} err | ||
* @param {any} cb | ||
* @memberof Hemera | ||
*/ | ||
close (cb) { | ||
this._extensions.onClose.dispatch(this, (err, val) => onClose(this, err, val, cb)) | ||
_onClose(err, cb) { | ||
const self = this | ||
// remove all active subscriptions | ||
self.removeAll() | ||
// Waiting before all queued messages was proceed | ||
// and then close hemera and nats | ||
self._transport.flush(() => { | ||
self._heavy.stop() | ||
// Does not throw an issue when connection is not available | ||
self._transport.close() | ||
if (err) { | ||
self.log.error(err) | ||
self.emit('error', err) | ||
if (_.isFunction(cb)) { | ||
cb(err) | ||
} | ||
} else { | ||
if (_.isFunction(cb)) { | ||
cb() | ||
} | ||
} | ||
}) | ||
} | ||
@@ -1156,0 +1477,0 @@ } |
@@ -12,2 +12,4 @@ 'use strict' | ||
const Constants = require('./constants') | ||
/** | ||
@@ -23,10 +25,12 @@ * | ||
* @param {any} response | ||
* @param {any} extensionCallback | ||
* @param {any} hemera | ||
* | ||
* @memberof Reply | ||
*/ | ||
constructor (request, response, extensionCallback) { | ||
constructor(request, response, logger) { | ||
this._request = request | ||
this._response = response | ||
this.extensionCallback = extensionCallback | ||
this.log = logger | ||
this.sent = false | ||
this._errored = false | ||
} | ||
@@ -41,3 +45,3 @@ | ||
*/ | ||
set payload (value) { | ||
set payload(value) { | ||
this._response.payload = value | ||
@@ -53,3 +57,3 @@ } | ||
*/ | ||
get payload () { | ||
get payload() { | ||
return this._response.payload | ||
@@ -59,8 +63,14 @@ } | ||
/** | ||
* Set the response error | ||
* Error can not be set twice | ||
* | ||
* | ||
* | ||
* @memberof Reply | ||
*/ | ||
set error (value) { | ||
set error(value) { | ||
if (this._errored) { | ||
this.log.debug(new Error(Constants.REPLY_ERROR_ALREADY_SET)) | ||
return | ||
} | ||
this._errored = true | ||
this._response.error = value | ||
@@ -76,3 +86,3 @@ } | ||
*/ | ||
get error () { | ||
get error() { | ||
return this._response.error | ||
@@ -82,28 +92,23 @@ } | ||
/** | ||
* Abort the current request and respond wih the passed value | ||
* Set the response error | ||
* | ||
* @param {any} value | ||
* | ||
* @param {any} msg | ||
* @memberof Reply | ||
*/ | ||
end (value) { | ||
if (value instanceof Error) { | ||
this.extensionCallback(value) | ||
} else { | ||
this.extensionCallback(null, value, true) | ||
send(msg) { | ||
const self = this | ||
if (self.sent) { | ||
self.log.warn(new Error(Constants.REPLY_ALREADY_SENT)) | ||
return | ||
} | ||
} | ||
/** | ||
* Runs through all extensions and keep the passed value to respond it | ||
* | ||
* @param {any} value | ||
* | ||
* @memberof Reply | ||
*/ | ||
send (value) { | ||
if (value instanceof Error) { | ||
this.extensionCallback(value) | ||
} else { | ||
this.extensionCallback(null, value) | ||
self.sent = true | ||
if (msg) { | ||
if (msg instanceof Error) { | ||
self.error = msg | ||
} else { | ||
self.payload = msg | ||
} | ||
} | ||
@@ -110,0 +115,0 @@ } |
@@ -12,7 +12,7 @@ 'use strict' | ||
function nanoToMsString (val) { | ||
function nanoToMsString(val) { | ||
return (val / 1e6).toFixed(2) + 'ms' | ||
} | ||
function inbound (ctx) { | ||
function inbound(ctx) { | ||
return { | ||
@@ -25,3 +25,3 @@ id: ctx.request$.id, | ||
function outbound (ctx) { | ||
function outbound(ctx) { | ||
return { | ||
@@ -28,0 +28,0 @@ id: ctx._message.request.id, |
@@ -23,5 +23,4 @@ 'use strict' | ||
*/ | ||
constructor (payload) { | ||
constructor(payload) { | ||
this._request = {} | ||
this._locals = {} | ||
this.payload = payload | ||
@@ -37,3 +36,3 @@ } | ||
*/ | ||
get payload () { | ||
get payload() { | ||
return this._request.value | ||
@@ -46,17 +45,6 @@ } | ||
* @readonly | ||
* | ||
* @memberof ServerRequest | ||
*/ | ||
get locals () { | ||
return this._locals | ||
} | ||
/** | ||
* | ||
* | ||
* @readonly | ||
* @type {*} | ||
* @memberOf ServerRequest | ||
*/ | ||
get error () { | ||
get error() { | ||
return this._request.error | ||
@@ -71,3 +59,3 @@ } | ||
*/ | ||
set payload (value) { | ||
set payload(value) { | ||
this._request.value = value | ||
@@ -82,3 +70,3 @@ } | ||
*/ | ||
set error (error) { | ||
set error(error) { | ||
this._request.error = error | ||
@@ -85,0 +73,0 @@ } |
@@ -12,2 +12,4 @@ 'use strict' | ||
const SuperError = require('super-error') | ||
/** | ||
@@ -24,3 +26,3 @@ * @class ServerResponse | ||
*/ | ||
constructor () { | ||
constructor() { | ||
this._response = {} | ||
@@ -36,3 +38,3 @@ } | ||
*/ | ||
get payload () { | ||
get payload() { | ||
return this._response.value | ||
@@ -47,3 +49,3 @@ } | ||
*/ | ||
set payload (value) { | ||
set payload(value) { | ||
this._response.value = value | ||
@@ -58,4 +60,8 @@ } | ||
*/ | ||
set error (error) { | ||
this._response.error = error | ||
set error(error) { | ||
if (error instanceof SuperError) { | ||
this._response.error = error.rootCause || error.cause || error | ||
} else { | ||
this._response.error = error | ||
} | ||
} | ||
@@ -70,3 +76,3 @@ | ||
*/ | ||
get error () { | ||
get error() { | ||
return this._response.error | ||
@@ -73,0 +79,0 @@ } |
@@ -25,3 +25,3 @@ 'use strict' | ||
*/ | ||
constructor (params) { | ||
constructor(params) { | ||
this.nc = params.transport | ||
@@ -37,3 +37,3 @@ } | ||
*/ | ||
get driver () { | ||
get driver() { | ||
return this.nc | ||
@@ -49,3 +49,3 @@ } | ||
*/ | ||
timeout () { | ||
timeout() { | ||
return this.nc.timeout.apply(this.nc, arguments) | ||
@@ -61,3 +61,3 @@ } | ||
*/ | ||
send () { | ||
send() { | ||
return this.nc.publish.apply(this.nc, arguments) | ||
@@ -73,3 +73,3 @@ } | ||
*/ | ||
close () { | ||
close() { | ||
return this.nc.close.apply(this.nc, arguments) | ||
@@ -84,3 +84,3 @@ } | ||
*/ | ||
flush () { | ||
flush() { | ||
return this.nc.flush.apply(this.nc, arguments) | ||
@@ -96,3 +96,3 @@ } | ||
*/ | ||
subscribe () { | ||
subscribe() { | ||
return this.nc.subscribe.apply(this.nc, arguments) | ||
@@ -108,3 +108,3 @@ } | ||
*/ | ||
unsubscribe () { | ||
unsubscribe() { | ||
return this.nc.unsubscribe.apply(this.nc, arguments) | ||
@@ -120,3 +120,3 @@ } | ||
*/ | ||
sendRequest () { | ||
sendRequest() { | ||
return this.nc.request.apply(this.nc, arguments) | ||
@@ -123,0 +123,0 @@ } |
158
lib/util.js
@@ -13,6 +13,7 @@ 'use strict' | ||
const _ = require('lodash') | ||
const Co = require('co') | ||
const lut = [] | ||
for (let i = 0; i < 256; i++) { lut[i] = (i < 16 ? '0' : '') + (i).toString(16) } | ||
for (let i = 0; i < 256; i++) { | ||
lut[i] = (i < 16 ? '0' : '') + i.toString(16) | ||
} | ||
@@ -32,3 +33,3 @@ /** | ||
*/ | ||
static natsWildcardToRegex (subject) { | ||
static natsWildcardToRegex(subject) { | ||
let hasTokenWildcard = subject.indexOf('*') > -1 | ||
@@ -49,6 +50,2 @@ let hasFullWildcard = subject.indexOf('>') > -1 | ||
/** | ||
* Convert a generator or async function | ||
* to promise factory function and call the last | ||
* argument as callback | ||
* | ||
* @static | ||
@@ -58,19 +55,12 @@ * @param {any} handler | ||
*/ | ||
static toPromiseFact (handler) { | ||
if (Util.isGeneratorFunction(handler)) { | ||
return function () { | ||
static toPromiseFact(handler) { | ||
if (Util.isAsyncFunction(handler)) { | ||
return function() { | ||
// -1 because (req, res, next) | ||
const next = arguments[arguments.length - 1] | ||
return Co(handler.apply(null, arguments)) | ||
return handler | ||
.apply(null, arguments) | ||
.then(x => next(null, x)) | ||
.catch(next) | ||
} | ||
} else if (Util.isAsyncFunction(handler)) { | ||
return function () { | ||
// -1 because (req, res, next) | ||
const next = arguments[arguments.length - 1] | ||
return handler.apply(null, arguments) | ||
.then(x => next(null, x)) | ||
.catch(next) | ||
} | ||
} else { | ||
@@ -85,11 +75,25 @@ return handler | ||
*/ | ||
static randomId () { | ||
const d0 = Math.random() * 0xffffffff | 0 | ||
const d1 = Math.random() * 0xffffffff | 0 | ||
const d2 = Math.random() * 0xffffffff | 0 | ||
const d3 = Math.random() * 0xffffffff | 0 | ||
return lut[d0 & 0xff] + lut[d0 >> 8 & 0xff] + lut[d0 >> 16 & 0xff] + lut[d0 >> 24 & 0xff] + | ||
lut[d1 & 0xff] + lut[d1 >> 8 & 0xff] + lut[d1 >> 16 & 0x0f | 0x40] + lut[d1 >> 24 & 0xff] + | ||
lut[d2 & 0x3f | 0x80] + lut[d2 >> 8 & 0xff] + lut[d2 >> 16 & 0xff] + lut[d2 >> 24 & 0xff] + | ||
lut[d3 & 0xff] + lut[d3 >> 8 & 0xff] + lut[d3 >> 16 & 0xff] + lut[d3 >> 24 & 0xff] | ||
static randomId() { | ||
const d0 = (Math.random() * 0xffffffff) | 0 | ||
const d1 = (Math.random() * 0xffffffff) | 0 | ||
const d2 = (Math.random() * 0xffffffff) | 0 | ||
const d3 = (Math.random() * 0xffffffff) | 0 | ||
return ( | ||
lut[d0 & 0xff] + | ||
lut[(d0 >> 8) & 0xff] + | ||
lut[(d0 >> 16) & 0xff] + | ||
lut[(d0 >> 24) & 0xff] + | ||
lut[d1 & 0xff] + | ||
lut[(d1 >> 8) & 0xff] + | ||
lut[((d1 >> 16) & 0x0f) | 0x40] + | ||
lut[(d1 >> 24) & 0xff] + | ||
lut[(d2 & 0x3f) | 0x80] + | ||
lut[(d2 >> 8) & 0xff] + | ||
lut[(d2 >> 16) & 0xff] + | ||
lut[(d2 >> 24) & 0xff] + | ||
lut[d3 & 0xff] + | ||
lut[(d3 >> 8) & 0xff] + | ||
lut[(d3 >> 16) & 0xff] + | ||
lut[(d3 >> 24) & 0xff] | ||
) | ||
} | ||
@@ -107,3 +111,3 @@ | ||
*/ | ||
static serial (array, method, callback) { | ||
static eachSeries(array, method, callback) { | ||
if (!array.length) { | ||
@@ -113,4 +117,4 @@ callback() | ||
let i = 0 | ||
const iterate = function () { | ||
const done = function (err) { | ||
const iterate = function() { | ||
const done = function(err) { | ||
if (err) { | ||
@@ -135,41 +139,2 @@ callback(err) | ||
/** | ||
* Executes a series of callbacks and allows to interrupt | ||
* as well as to continue with a final value | ||
* | ||
* @param {Array<Function>} array | ||
* @param {Function} method | ||
* @param {Function} callback | ||
* | ||
* @memberOf Extension | ||
*/ | ||
static serialWithCancellation (array, method, callback) { | ||
if (!array.length) { | ||
callback() | ||
} else { | ||
let i = 0 | ||
const iterate = function () { | ||
const done = function (err, value, abort) { | ||
if (err) { | ||
callback(err) | ||
} else if (value && abort) { | ||
callback(null, value) | ||
} else { | ||
i = i + 1 | ||
if (i < array.length) { | ||
iterate(value) | ||
} else { | ||
callback(null, value) | ||
} | ||
} | ||
} | ||
method(array[i], done) | ||
} | ||
iterate() | ||
} | ||
} | ||
/** | ||
* Get high resolution time in nanoseconds | ||
@@ -182,5 +147,5 @@ * | ||
*/ | ||
static nowHrTime () { | ||
static nowHrTime() { | ||
const hrtime = process.hrtime() | ||
return ((+hrtime[0]) * 1e9) + (+hrtime[1]) | ||
return +hrtime[0] * 1e9 + +hrtime[1] | ||
} | ||
@@ -196,6 +161,6 @@ /** | ||
*/ | ||
static extractSchema (obj) { | ||
static extractSchema(obj) { | ||
if (obj === null) return obj | ||
return _.pickBy(obj, function (val, prop) { | ||
return _.pickBy(obj, function(val, prop) { | ||
return _.isObject(val) | ||
@@ -211,7 +176,7 @@ }) | ||
*/ | ||
static cleanPattern (obj) { | ||
static cleanPattern(obj) { | ||
if (obj === null) return obj | ||
return _.pickBy(obj, function (val, prop) { | ||
return !_.includes(prop, '$') && !_.isObject(val) | ||
return _.pickBy(obj, function(val, prop) { | ||
return (!_.includes(prop, '$') && !_.isObject(val)) || _.isRegExp(val) | ||
}) | ||
@@ -227,6 +192,6 @@ } | ||
*/ | ||
static cleanFromSpecialVars (obj) { | ||
static cleanFromSpecialVars(obj) { | ||
if (obj === null) return obj | ||
return _.pickBy(obj, function (val, prop) { | ||
return _.pickBy(obj, function(val, prop) { | ||
return !_.includes(prop, '$') | ||
@@ -242,3 +207,3 @@ }) | ||
*/ | ||
static pattern (args) { | ||
static pattern(args) { | ||
if (_.isString(args)) { | ||
@@ -250,4 +215,4 @@ return args | ||
let sb = [] | ||
_.each(args, function (v, k) { | ||
if (!~k.indexOf('$') && !_.isFunction(v) && !_.isObject(v)) { | ||
_.each(args, function(v, k) { | ||
if ((!~k.indexOf('$') && !_.isFunction(v) && !_.isObject(v)) || _.isRegExp(v)) { | ||
sb.push(k + ':' + v) | ||
@@ -262,3 +227,3 @@ } | ||
/** | ||
/* | ||
* | ||
@@ -269,6 +234,5 @@ * | ||
* @returns | ||
* | ||
* @memberof Util | ||
*/ | ||
static isGeneratorFunction (obj) { | ||
static isAsyncFunction(obj) { | ||
var constructor = obj.constructor | ||
@@ -278,3 +242,6 @@ if (!constructor) { | ||
} | ||
if (constructor.name === 'GeneratorFunction' || constructor.displayName === 'GeneratorFunction') { | ||
if ( | ||
constructor.name === 'AsyncFunction' || | ||
constructor.displayName === 'AsyncFunction' | ||
) { | ||
return true | ||
@@ -284,23 +251,4 @@ } | ||
} | ||
/** | ||
* | ||
* | ||
* @static | ||
* @param {any} obj | ||
* @returns | ||
* @memberof Util | ||
*/ | ||
static isAsyncFunction (obj) { | ||
var constructor = obj.constructor | ||
if (!constructor) { | ||
return false | ||
} | ||
if (constructor.name === 'AsyncFunction' || constructor.displayName === 'AsyncFunction') { | ||
return true | ||
} | ||
return false | ||
} | ||
} | ||
module.exports = Util |
{ | ||
"name": "nats-hemera", | ||
"author": "Dustin Deus (https://github.com/StarpTech)", | ||
"version": "1.6.2", | ||
"version": "2.0.0-0", | ||
"main": "lib/index.js", | ||
@@ -43,11 +43,13 @@ "homepage": "https://hemerajs.github.io/hemera/", | ||
"dependencies": { | ||
"bloomrun": "4.0.0", | ||
"co": "4.6.x", | ||
"avvio": "2.2.x", | ||
"bloomrun": "4.0.x", | ||
"errio": "1.2.x", | ||
"fast-safe-stringify": "1.2.x", | ||
"fastparallel": "2.3.x", | ||
"fastseries": "1.7.x", | ||
"heavy": "4.0.x", | ||
"hoek": "4.2.x", | ||
"joi": "11.0.x", | ||
"joi": "11.1.x", | ||
"lodash": "4.17.x", | ||
"p-defer": "1.0.x", | ||
"pino": "4.7.x", | ||
@@ -54,0 +56,0 @@ "super-error": "2.1.x", |
# Hemera package | ||
[![npm](https://img.shields.io/npm/v/nats-hemera.svg?maxAge=3600)](https://www.npmjs.com/package/nats-hemera) | ||
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg)](http://standardjs.com) | ||
[![styled with prettier](https://img.shields.io/badge/styled_with-prettier-ff69b4.svg)](#badge) | ||
This is the core package of hemera. For more details see [Hemera](https://github.com/hemerajs/hemera) |
Sorry, the diff of this file is not supported yet
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
68930
15
22
2670
1
+ Addedavvio@2.2.x
+ Addedfastseries@1.7.x
+ Addedp-defer@1.0.x
+ Addedavvio@2.2.0(transitive)
+ Addedfastq@1.19.0(transitive)
+ Addedfastseries@1.7.2(transitive)
+ Addedjoi@11.1.1(transitive)
+ Addedp-defer@1.0.0(transitive)
- Removedco@4.6.x
- Removedco@4.6.0(transitive)
- Removedjoi@11.0.3(transitive)
Updatedbloomrun@4.0.x
Updatedjoi@11.1.x