seneca-transport
Advanced tools
Comparing version 2.4.0 to 3.0.0
155
lib/http.js
@@ -18,4 +18,4 @@ /* Copyright (c) 2013-2015 Richard Rodger, MIT License */ | ||
exports.listen = function (options, transportUtil) { | ||
return function (msg, callback) { | ||
exports.listen = function(options, transportUtil) { | ||
return function(msg, callback) { | ||
var seneca = this.root.delegate() | ||
@@ -25,3 +25,6 @@ | ||
var server = (listenOptions.protocol === 'https') ? Https.createServer(listenOptions.serverOptions) : Http.createServer() | ||
var server = | ||
listenOptions.protocol === 'https' | ||
? Https.createServer(listenOptions.serverOptions) | ||
: Http.createServer() | ||
@@ -32,6 +35,6 @@ var listener | ||
server.on('request', function (req, res) { | ||
server.on('request', function(req, res) { | ||
internals.timeout(listenOptions, req, res) | ||
req.query = Qs.parse(Url.parse(req.url).query) | ||
internals.setBody(seneca, transportUtil, req, res, function (err) { | ||
internals.setBody(seneca, transportUtil, req, res, function(err) { | ||
if (err) { | ||
@@ -45,7 +48,19 @@ return res.end() | ||
server.on('error', function (err) { | ||
if ('EADDRINUSE' === err.code && listenAttempts < listenOptions.max_listen_attempts) { | ||
server.on('error', function(err) { | ||
if ( | ||
'EADDRINUSE' === err.code && | ||
listenAttempts < listenOptions.max_listen_attempts | ||
) { | ||
listenAttempts++ | ||
seneca.log.warn('listen', 'attempt', listenAttempts, err.code, listenOptions) | ||
setTimeout(listen, 100 + Math.floor(Math.random() * listenOptions.attempt_delay)) | ||
seneca.log.warn( | ||
'listen', | ||
'attempt', | ||
listenAttempts, | ||
err.code, | ||
listenOptions | ||
) | ||
setTimeout( | ||
listen, | ||
100 + Math.floor(Math.random() * listenOptions.attempt_delay) | ||
) | ||
return | ||
@@ -56,3 +71,3 @@ } | ||
server.on('listening', function () { | ||
server.on('listening', function() { | ||
listen_details.port = server.address().port | ||
@@ -63,10 +78,16 @@ seneca.log.debug('listen', listen_details) | ||
function listen () { | ||
function listen() { | ||
listener = server.listen( | ||
listen_details.port = transportUtil.resolveDynamicValue(listenOptions.port, listenOptions), | ||
listen_details.host = transportUtil.resolveDynamicValue(listenOptions.host, listenOptions) | ||
(listen_details.port = transportUtil.resolveDynamicValue( | ||
listenOptions.port, | ||
listenOptions | ||
)), | ||
(listen_details.host = transportUtil.resolveDynamicValue( | ||
listenOptions.host, | ||
listenOptions | ||
)) | ||
) | ||
} | ||
transportUtil.close(seneca, function (done) { | ||
transportUtil.close(seneca, function(done) { | ||
// node 0.10 workaround, otherwise it throws | ||
@@ -83,4 +104,4 @@ if (listener && listener._handle) { | ||
exports.client = function (options, transportUtil) { | ||
return function (msg, callback) { | ||
exports.client = function(options, transportUtil) { | ||
return function(msg, callback) { | ||
var seneca = this.root.delegate() | ||
@@ -93,12 +114,31 @@ | ||
if (options[msg.type].headers) { | ||
defaultHeaders = _.omit(options[msg.type].headers, | ||
['Accept', 'Content-Type', 'Content-Length', 'Cache-Control', 'seneca-id', | ||
'seneca-kind', 'seneca-origin', 'seneca-track', 'seneca-time-client-sent', | ||
'seneca-accept', 'seneca-time-listen-recv', 'seneca-time-listen-sent']) | ||
defaultHeaders = _.omit(options[msg.type].headers, [ | ||
'Accept', | ||
'Content-Type', | ||
'Content-Length', | ||
'Cache-Control', | ||
'seneca-id', | ||
'seneca-kind', | ||
'seneca-origin', | ||
'seneca-track', | ||
'seneca-time-client-sent', | ||
'seneca-accept', | ||
'seneca-time-listen-recv', | ||
'seneca-time-listen-sent' | ||
]) | ||
} | ||
var send = function (spec, topic, send_done) { | ||
var host = transportUtil.resolveDynamicValue(clientOptions.host, clientOptions) | ||
var port = transportUtil.resolveDynamicValue(clientOptions.port, clientOptions) | ||
var path = transportUtil.resolveDynamicValue(clientOptions.path, clientOptions) | ||
var send = function(spec, topic, send_done) { | ||
var host = transportUtil.resolveDynamicValue( | ||
clientOptions.host, | ||
clientOptions | ||
) | ||
var port = transportUtil.resolveDynamicValue( | ||
clientOptions.port, | ||
clientOptions | ||
) | ||
var path = transportUtil.resolveDynamicValue( | ||
clientOptions.path, | ||
clientOptions | ||
) | ||
@@ -111,7 +151,7 @@ // never use a 0.0.0.0 as targeted host, because Windows can't handle it | ||
function action (msg, done, meta) { | ||
function action(msg, done, meta) { | ||
var data = transportUtil.prepare_request(this, msg, done, meta) | ||
var headers = { | ||
'Accept': 'application/json', | ||
Accept: 'application/json', | ||
'Content-Type': 'application/json', | ||
@@ -121,3 +161,7 @@ 'seneca-id': data.id, | ||
'seneca-origin': seneca.id, | ||
'seneca-track': transportUtil.stringifyJSON(seneca, 'send-track', data.track || []), | ||
'seneca-track': transportUtil.stringifyJSON( | ||
seneca, | ||
'send-track', | ||
data.track || [] | ||
), | ||
'seneca-time-client-sent': data.time.client_sent | ||
@@ -137,3 +181,3 @@ } | ||
Wreck.post(url, requestOptions, function (err, res, payload) { | ||
Wreck.post(url, requestOptions, function(err, res, payload) { | ||
var response = { | ||
@@ -159,4 +203,3 @@ kind: 'res', | ||
} | ||
} | ||
else { | ||
} else { | ||
response.id = data.id | ||
@@ -172,3 +215,3 @@ response.origin = seneca.id | ||
transportUtil.close(seneca, function (done) { | ||
transportUtil.close(seneca, function(done) { | ||
done() | ||
@@ -181,13 +224,15 @@ }) | ||
internals.setBody = function (seneca, transportUtil, req, res, next) { | ||
internals.setBody = function(seneca, transportUtil, req, res, next) { | ||
var buf = [] | ||
req.setEncoding('utf8') | ||
req.on('data', function (chunk) { | ||
req.on('data', function(chunk) { | ||
buf.push(chunk) | ||
}) | ||
req.on('end', function () { | ||
req.on('end', function() { | ||
try { | ||
var bufstr = buf.join('') | ||
var bodydata = bufstr.length ? transportUtil.parseJSON(seneca, 'req-body', bufstr) : {} | ||
var bodydata = bufstr.length | ||
? transportUtil.parseJSON(seneca, 'req-body', bufstr) | ||
: {} | ||
@@ -207,5 +252,5 @@ if (bodydata instanceof Error) { | ||
// deprecated | ||
(req.query && req.query.args$) ? Jsonic(req.query.args$) : {}, | ||
req.query && req.query.args$ ? Jsonic(req.query.args$) : {}, | ||
(req.query && req.query.msg$) ? Jsonic(req.query.msg$) : {}, | ||
req.query && req.query.msg$ ? Jsonic(req.query.msg$) : {}, | ||
req.query || {} | ||
@@ -215,4 +260,3 @@ ) | ||
next() | ||
} | ||
catch (err) { | ||
} catch (err) { | ||
res.write(err.message + ': ' + bufstr) | ||
@@ -225,5 +269,12 @@ res.statusCode = 400 | ||
internals.trackHeaders = function (listenOptions, seneca, transportUtil, req, res) { | ||
if (req.url.indexOf(listenOptions.path) !== 0) { | ||
return | ||
internals.trackHeaders = function( | ||
listenOptions, | ||
seneca, | ||
transportUtil, | ||
req, | ||
res | ||
) { | ||
if (Url.parse(req.url).pathname !== listenOptions.path) { | ||
res.statusCode = 404 | ||
return res.end() | ||
} | ||
@@ -236,3 +287,8 @@ var data | ||
origin: req.headers['seneca-origin'], | ||
track: transportUtil.parseJSON(seneca, 'track-receive', req.headers['seneca-track']) || [], | ||
track: | ||
transportUtil.parseJSON( | ||
seneca, | ||
'track-receive', | ||
req.headers['seneca-track'] | ||
) || [], | ||
time: { | ||
@@ -259,3 +315,3 @@ client_sent: req.headers['seneca-time-client-sent'] | ||
transportUtil.handle_request(seneca, data, listenOptions, function (out) { | ||
transportUtil.handle_request(seneca, data, listenOptions, function(out) { | ||
internals.sendResponse(seneca, transportUtil, res, out, data) | ||
@@ -265,3 +321,3 @@ }) | ||
internals.sendResponse = function (seneca, transportUtil, res, out, data) { | ||
internals.sendResponse = function(seneca, transportUtil, res, out, data) { | ||
var outJson = 'null' | ||
@@ -273,4 +329,3 @@ var httpcode = 200 | ||
outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.res) | ||
} | ||
else if (out && out.error) { | ||
} else if (out && out.error) { | ||
httpcode = out.error.statusCode || 500 | ||
@@ -302,4 +357,4 @@ outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.error) | ||
internals.timeout = function (listenOptions, req, res) { | ||
var id = setTimeout(function () { | ||
internals.timeout = function(listenOptions, req, res) { | ||
var id = setTimeout(function() { | ||
res.statusCode = 503 | ||
@@ -310,3 +365,3 @@ res.statusMessage = 'Response timeout' | ||
var clearTimeoutId = function () { | ||
var clearTimeoutId = function() { | ||
clearTimeout(id) | ||
@@ -313,0 +368,0 @@ } |
195
lib/tcp.js
@@ -10,3 +10,2 @@ /* Copyright (c) 2013-2015 Richard Rodger, MIT License */ | ||
var Reconnect = require('reconnect-core') | ||
var _ = require('lodash') | ||
@@ -16,4 +15,4 @@ // Declare internals | ||
exports.listen = function (options, transportUtil) { | ||
return function (args, callback) { | ||
exports.listen = function(options, transportUtil) { | ||
return function(args, callback) { | ||
var seneca = this.root.delegate() | ||
@@ -26,13 +25,19 @@ | ||
var listener = Net.createServer(function (connection) { | ||
seneca.log.debug('listen', 'connection', listenOptions, | ||
'remote', connection.remoteAddress, connection.remotePort) | ||
var listener = Net.createServer(function(connection) { | ||
seneca.log.debug( | ||
'listen', | ||
'connection', | ||
listenOptions, | ||
'remote', | ||
connection.remoteAddress, | ||
connection.remotePort | ||
) | ||
var parser = Ndjson.parse() | ||
var stringifier = Ndjson.stringify() | ||
parser.on('error', function (error) { | ||
parser.on('error', function(error) { | ||
console.error(error) | ||
connection.end() | ||
}) | ||
parser.on('data', function (data) { | ||
parser.on('data', function(data) { | ||
if (data instanceof Error) { | ||
@@ -47,3 +52,3 @@ var out = transportUtil.prepareResponse(seneca, {}) | ||
transportUtil.handle_request(seneca, data, options, function (out) { | ||
transportUtil.handle_request(seneca, data, options, function(out) { | ||
if (out === null || !out.sync) { | ||
@@ -60,4 +65,9 @@ return | ||
connection.on('error', function (err) { | ||
seneca.log.error('listen', 'pipe-error', listenOptions, err && err.stack) | ||
connection.on('error', function(err) { | ||
seneca.log.error( | ||
'listen', | ||
'pipe-error', | ||
listenOptions, | ||
err && err.stack | ||
) | ||
}) | ||
@@ -68,3 +78,3 @@ | ||
listener.once('listening', function () { | ||
listener.once('listening', function() { | ||
listenOptions.port = listener.address().port | ||
@@ -75,9 +85,21 @@ seneca.log.debug('listen', 'open', listenOptions) | ||
listener.on('error', function (err) { | ||
listener.on('error', function(err) { | ||
seneca.log.error('listen', 'net-error', listenOptions, err && err.stack) | ||
if ('EADDRINUSE' === err.code && listenAttempts < listenOptions.max_listen_attempts) { | ||
if ( | ||
'EADDRINUSE' === err.code && | ||
listenAttempts < listenOptions.max_listen_attempts | ||
) { | ||
listenAttempts++ | ||
seneca.log.warn('listen', 'attempt', listenAttempts, err.code, listenOptions) | ||
setTimeout(listen, 100 + Math.floor(Math.random() * listenOptions.attempt_delay)) | ||
seneca.log.warn( | ||
'listen', | ||
'attempt', | ||
listenAttempts, | ||
err.code, | ||
listenOptions | ||
) | ||
setTimeout( | ||
listen, | ||
100 + Math.floor(Math.random() * listenOptions.attempt_delay) | ||
) | ||
return | ||
@@ -87,11 +109,10 @@ } | ||
listener.on('close', function () { | ||
listener.on('close', function() { | ||
seneca.log.debug('listen', 'close', listenOptions) | ||
}) | ||
function listen () { | ||
function listen() { | ||
if (listenOptions.path) { | ||
listener.listen(listenOptions.path) | ||
} | ||
else { | ||
} else { | ||
listener.listen(listenOptions.port, listenOptions.host) | ||
@@ -102,3 +123,3 @@ } | ||
transportUtil.close(seneca, function (next) { | ||
transportUtil.close(seneca, function(next) { | ||
// node 0.10 workaround, otherwise it throws | ||
@@ -114,5 +135,9 @@ if (listener._handle) { | ||
exports.client = function (options, transportUtil) { | ||
return function (args, callback) { | ||
exports.client = function(options, transportUtil) { | ||
return function(args, callback) { | ||
var seneca = this.root.delegate() | ||
var conStream | ||
var connection | ||
var established = false | ||
var stringifier | ||
@@ -125,22 +150,19 @@ var type = args.type | ||
var clientOptions = seneca.util.deepextend(options[args.type], args) | ||
clientOptions.host = !args.host && clientOptions.host === '0.0.0.0' ? '127.0.0.1' : clientOptions.host | ||
clientOptions.host = | ||
!args.host && clientOptions.host === '0.0.0.0' | ||
? '127.0.0.1' | ||
: clientOptions.host | ||
var send = function (spec, topic, send_done) { | ||
seneca.log.debug('client', type, 'send-init', spec, topic, clientOptions) | ||
var connect = function() { | ||
seneca.log.debug('client', type, 'send-init', '', '', clientOptions) | ||
var connections = [] | ||
var established = false | ||
var reconnect = internals.reconnect(function (stream) { | ||
// unique connections are by the options e.g. host:port | ||
// don't need to pipe everything again if it exists | ||
// established is for a race condition for `connect` event pushing | ||
var existing = _.find(connections, { clientOptions: clientOptions }) | ||
if (!established || existing && existing.setup) { | ||
return | ||
} | ||
var msger = internals.clientMessager(seneca, clientOptions, transportUtil) | ||
var reconnect = internals.reconnect(function(stream) { | ||
conStream = stream | ||
var msger = internals.clientMessager( | ||
seneca, | ||
clientOptions, | ||
transportUtil | ||
) | ||
var parser = Ndjson.parse() | ||
var stringifier = Ndjson.stringify() | ||
stringifier = Ndjson.stringify() | ||
@@ -153,30 +175,39 @@ stream | ||
existing.setup = true | ||
if (!established) reconnect.emit('s_connected', stringifier) | ||
established = true | ||
}) | ||
send_done(null, function (args, done, meta) { | ||
var outmsg = transportUtil.prepare_request(this, args, done, meta) | ||
if (!outmsg.replied) stringifier.write(outmsg) | ||
}) | ||
reconnect.on('connect', function(connection) { | ||
seneca.log.debug('client', type, 'connect', '', '', clientOptions) | ||
// connection.clientOptions = clientOptions // unique per connection | ||
// connections.push(connection) | ||
// established = true | ||
}) | ||
reconnect.on('connect', function (connection) { | ||
seneca.log.debug('client', type, 'connect', spec, topic, clientOptions) | ||
connection.clientOptions = clientOptions // unique per connection | ||
connections.push(connection) | ||
established = true | ||
reconnect.on('reconnect', function() { | ||
seneca.log.debug('client', type, 'reconnect', '', '', clientOptions) | ||
}) | ||
reconnect.on('reconnect', function () { | ||
seneca.log.debug('client', type, 'reconnect', spec, topic, clientOptions) | ||
}) | ||
reconnect.on('disconnect', function (err) { | ||
seneca.log.debug('client', type, 'disconnect', spec, topic, clientOptions, | ||
(err && err.stack) || err) | ||
reconnect.on('disconnect', function(err) { | ||
seneca.log.debug( | ||
'client', | ||
type, | ||
'disconnect', | ||
'', | ||
'', | ||
clientOptions, | ||
(err && err.stack) || err | ||
) | ||
var conn = _.find(connections, { clientOptions: clientOptions }) | ||
if (conn) { | ||
conn.setup = false | ||
} | ||
established = false | ||
}) | ||
reconnect.on('error', function (err) { | ||
seneca.log.debug('client', type, 'error', spec, topic, clientOptions, err.stack) | ||
reconnect.on('error', function(err) { | ||
seneca.log.debug( | ||
'client', | ||
type, | ||
'error', | ||
'', | ||
'', | ||
clientOptions, | ||
err.stack | ||
) | ||
}) | ||
@@ -189,9 +220,30 @@ | ||
transportUtil.close(seneca, function (done) { | ||
transportUtil.close(seneca, function(done) { | ||
reconnect.disconnect() | ||
internals.closeConnections(connections, seneca) | ||
internals.closeConnections([conStream], seneca) | ||
done() | ||
}) | ||
return reconnect | ||
} | ||
function getClient(cb) { | ||
if (!connection) connection = connect() | ||
if (established) { | ||
cb(stringifier) | ||
} else { | ||
connection.once('s_connected', cb) | ||
} | ||
} | ||
var send = function(spec, topic, send_done) { | ||
send_done(null, function(args, done, meta) { | ||
var self = this | ||
getClient(function(stringifier) { | ||
var outmsg = transportUtil.prepare_request(self, args, done, meta) | ||
if (!outmsg.replied) stringifier.write(outmsg) | ||
}) | ||
}) | ||
} | ||
transportUtil.make_client(seneca, send, clientOptions, callback) | ||
@@ -201,6 +253,6 @@ } | ||
internals.clientMessager = function (seneca, options, transportUtil) { | ||
internals.clientMessager = function(seneca, options, transportUtil) { | ||
var messager = new Stream.Duplex({ objectMode: true }) | ||
messager._read = function () {} | ||
messager._write = function (data, enc, callback) { | ||
messager._read = function() {} | ||
messager._write = function(data, enc, callback) { | ||
transportUtil.handle_response(seneca, data, options) | ||
@@ -212,3 +264,3 @@ return callback() | ||
internals.closeConnections = function (connections, seneca) { | ||
internals.closeConnections = function(connections, seneca) { | ||
for (var i = 0, il = connections.length; i < il; ++i) { | ||
@@ -219,7 +271,6 @@ internals.destroyConnection(connections[i], seneca) | ||
internals.destroyConnection = function (connection, seneca) { | ||
internals.destroyConnection = function(connection, seneca) { | ||
try { | ||
connection.destroy() | ||
} | ||
catch (e) { | ||
} catch (e) { | ||
seneca.log.error(e) | ||
@@ -229,5 +280,5 @@ } | ||
internals.reconnect = Reconnect(function () { | ||
internals.reconnect = Reconnect(function() { | ||
var args = [].slice.call(arguments) | ||
return Net.connect.apply(null, args) | ||
}) |
/* Copyright (c) 2015-2017 Richard Rodger, MIT License */ | ||
'use strict' | ||
var Util = require('util') | ||
@@ -13,3 +12,2 @@ | ||
// Declare internals | ||
@@ -20,12 +18,15 @@ var internals = { | ||
msgmap: { | ||
'no_data': 'The message has no data.', | ||
'invalid_kind_act': 'Inbound messages should have kind "act", kind was: <%=kind%>.', | ||
'no_message_id': 'The message has no identifier.', | ||
'invalid_origin': 'The message response is not for this instance, origin was <%=origin%>.', | ||
'unknown_message_id': 'The message has an unknown identifier', | ||
'own_message': 'Inbound message rejected as originated from this server.', | ||
'message_loop': 'Inbound message rejected as looping back to this server.', | ||
'data_error': 'Inbound message included an error description.', | ||
'invalid_json': 'Invalid JSON: <%=input%>.', | ||
'unexcepted_async_error': 'Unexcepted error response to asynchronous message.' | ||
no_data: 'The message has no data.', | ||
invalid_kind_act: | ||
'Inbound messages should have kind "act", kind was: <%=kind%>.', | ||
no_message_id: 'The message has no identifier.', | ||
invalid_origin: | ||
'The message response is not for this instance, origin was <%=origin%>.', | ||
unknown_message_id: 'The message has an unknown identifier', | ||
own_message: 'Inbound message rejected as originated from this server.', | ||
message_loop: 'Inbound message rejected as looping back to this server.', | ||
data_error: 'Inbound message included an error description.', | ||
invalid_json: 'Invalid JSON: <%=input%>.', | ||
unexcepted_async_error: | ||
'Unexcepted error response to asynchronous message.' | ||
}, | ||
@@ -36,4 +37,4 @@ override: true | ||
module.exports = internals.Utils = function (context) { | ||
this._msgprefix = (!context.options.msgprefix ? '' : context.options.msgprefix) | ||
module.exports = internals.Utils = function(context) { | ||
this._msgprefix = !context.options.msgprefix ? '' : context.options.msgprefix | ||
this._context = context | ||
@@ -44,3 +45,7 @@ } | ||
internals.Utils.prototype.handle_response = function (seneca, data, client_options) { | ||
internals.Utils.prototype.handle_response = function( | ||
seneca, | ||
data, | ||
client_options | ||
) { | ||
data.time = data.time || {} | ||
@@ -77,3 +82,3 @@ data.time.client_recv = Date.now() | ||
_.each(data.error, function (value, key) { | ||
_.each(data.error, function(value, key) { | ||
err[key] = value | ||
@@ -83,7 +88,12 @@ }) | ||
if (!data.sync) { | ||
seneca.log.warn('client', 'unexcepted_async_error', client_options, data, err) | ||
seneca.log.warn( | ||
'client', | ||
'unexcepted_async_error', | ||
client_options, | ||
data, | ||
err | ||
) | ||
return true | ||
} | ||
} | ||
else { | ||
} else { | ||
result = this.handle_entity(seneca, data.res) | ||
@@ -100,4 +110,3 @@ } | ||
this._context.callmap.del(data.id) | ||
} | ||
else { | ||
} else { | ||
if (this._context.options.warn.unknown_message_id) { | ||
@@ -109,3 +118,2 @@ seneca.log.warn('client', 'unknown_message_id', client_options, data) | ||
var actinfo = { | ||
@@ -131,15 +139,19 @@ id: data.id, | ||
internals.Utils.prototype.callmeta = function (options) { | ||
internals.Utils.prototype.callmeta = function(options) { | ||
try { | ||
options.callmeta.done(options.err, options.result, options.actinfo) | ||
} catch (e) { | ||
options.seneca.log.error( | ||
'client', | ||
'callback_error', | ||
options.client_options, | ||
options.data, | ||
e.stack || e | ||
) | ||
} | ||
catch (e) { | ||
options.seneca.log.error('client', 'callback_error', options.client_options, options.data, e.stack || e) | ||
} | ||
} | ||
internals.Utils.prototype.prepare_request = function (seneca, args, done, meta) { | ||
var meta$ = (args.meta$ || meta) | ||
internals.Utils.prototype.prepare_request = function(seneca, args, done, meta) { | ||
var meta$ = args.meta$ || meta || {} | ||
// FIX: this is mutating args.meta$ - sync should be inited elsewhere | ||
@@ -157,4 +169,3 @@ meta$.sync = void 0 === meta$.sync ? true : meta$.sync | ||
this._context.callmap.set(meta$.id, callmeta) | ||
} | ||
else { | ||
} else { | ||
this.callmeta({ | ||
@@ -173,3 +184,3 @@ callmeta: callmeta, | ||
if (args.transport$) { | ||
track = _.clone((args.transport$.track || [])) | ||
track = _.clone(args.transport$.track || []) | ||
} | ||
@@ -188,2 +199,7 @@ track.push(seneca.id) | ||
// workaround to send meta.custom object go along with transport | ||
if (meta && meta.custom) { | ||
output.act.custom$ = (meta && meta.custom) || undefined | ||
} | ||
output.msg$ = { | ||
@@ -202,3 +218,8 @@ vin: 1, | ||
internals.Utils.prototype.handle_request = function (seneca, data, listen_options, respond) { | ||
internals.Utils.prototype.handle_request = function( | ||
seneca, | ||
data, | ||
listen_options, | ||
respond | ||
) { | ||
if (!data) { | ||
@@ -230,3 +251,6 @@ return respond({ input: data, error: internals.error('no_data') }) | ||
if (this._context.options.check.own_message && this._context.callmap.has(data.id)) { | ||
if ( | ||
this._context.options.check.own_message && | ||
this._context.callmap.has(data.id) | ||
) { | ||
if (this._context.options.warn.own_message) { | ||
@@ -268,12 +292,16 @@ seneca.log.warn('listen', 'own_message', listen_options, data) | ||
internals.Utils.prototype.requestAct = function (seneca, input, output, respond) { | ||
internals.Utils.prototype.requestAct = function( | ||
seneca, | ||
input, | ||
output, | ||
respond | ||
) { | ||
var self = this | ||
try { | ||
seneca.act(input, function (err, out) { | ||
seneca.act(input, function(err, out) { | ||
self.update_output(input, output, err, out) | ||
respond(output) | ||
}) | ||
} | ||
catch (e) { | ||
} catch (e) { | ||
self.catch_act_error(seneca, e, input, {}, output) | ||
@@ -284,3 +312,8 @@ respond(output) | ||
internals.Utils.prototype.make_client = function (context_seneca, make_send, client_options, client_done) { | ||
internals.Utils.prototype.make_client = function( | ||
context_seneca, | ||
make_send, | ||
client_options, | ||
client_done | ||
) { | ||
var instance = this._context.seneca | ||
@@ -293,4 +326,3 @@ | ||
make_send = context_seneca | ||
} | ||
else { | ||
} else { | ||
instance = context_seneca | ||
@@ -302,3 +334,3 @@ } | ||
var finish = function (err, send) { | ||
var finish = function(err, send) { | ||
if (err) { | ||
@@ -320,5 +352,5 @@ return client_done(err) | ||
internals.Utils.prototype.make_anyclient = function (opts, make_send, done) { | ||
internals.Utils.prototype.make_anyclient = function(opts, make_send, done) { | ||
var self = this | ||
make_send({}, this._msgprefix + 'any', function (err, send) { | ||
make_send({}, this._msgprefix + 'any', function(err, send) { | ||
if (err) { | ||
@@ -333,5 +365,7 @@ return done(err) | ||
id: opts.id || Nid(), | ||
toString: function () { return 'any-' + this.id }, | ||
toString: function() { | ||
return 'any-' + this.id | ||
}, | ||
/* | ||
/* | ||
// TODO: is this used? | ||
@@ -343,3 +377,3 @@ match: function (args) { | ||
send: function (args, done, meta) { | ||
send: function(args, done, meta) { | ||
send.call(this, args, done, meta) | ||
@@ -353,10 +387,15 @@ } | ||
internals.Utils.prototype.make_pinclient = function (opts, resolvesend, argspatrun, done) { | ||
internals.Utils.prototype.make_pinclient = function( | ||
opts, | ||
resolvesend, | ||
argspatrun, | ||
done | ||
) { | ||
var client = { | ||
id: opts.id || Nid(), | ||
toString: function () { | ||
toString: function() { | ||
return 'pin-' + argspatrun.mark + '-' + this.id | ||
}, | ||
/* | ||
/* | ||
// TODO: is this used? | ||
@@ -369,7 +408,7 @@ match: function (args) { | ||
send: function (args, done, meta) { | ||
send: function(args, done, meta) { | ||
var seneca = this | ||
var spec = argspatrun.find(args) | ||
resolvesend(spec, args, function (err, send) { | ||
resolvesend(spec, args, function(err, send) { | ||
if (err) { | ||
@@ -386,3 +425,3 @@ return done(err) | ||
internals.Utils.prototype.resolve_pins = function (opts) { | ||
internals.Utils.prototype.resolve_pins = function(opts) { | ||
var pins = opts.pin || opts.pins | ||
@@ -394,4 +433,4 @@ if (pins) { | ||
if (pins) { | ||
pins = _.map(pins, function (pin) { | ||
return (typeof pin === 'string') ? Jsonic(pin) : pin | ||
pins = _.map(pins, function(pin) { | ||
return typeof pin === 'string' ? Jsonic(pin) : pin | ||
}) | ||
@@ -403,6 +442,6 @@ } | ||
internals.Utils.prototype.make_argspatrun = function (pins) { | ||
internals.Utils.prototype.make_argspatrun = function(pins) { | ||
var argspatrun = Patrun({ gex: true }) | ||
_.each(pins, function (pin) { | ||
_.each(pins, function(pin) { | ||
var spec = { pin: pin } | ||
@@ -412,3 +451,5 @@ argspatrun.add(pin, spec) | ||
argspatrun.mark = Util.inspect(pins).replace(/\s+/g, '').replace(/\n/g, '') | ||
argspatrun.mark = Util.inspect(pins) | ||
.replace(/\s+/g, '') | ||
.replace(/\n/g, '') | ||
@@ -418,5 +459,9 @@ return argspatrun | ||
internals.Utils.prototype.make_resolvesend = function (opts, sendmap, make_send) { | ||
internals.Utils.prototype.make_resolvesend = function( | ||
opts, | ||
sendmap, | ||
make_send | ||
) { | ||
var self = this | ||
return function (spec, args, done) { | ||
return function(spec, args, done) { | ||
var topic = self.resolve_topic(opts, spec, args) | ||
@@ -428,3 +473,3 @@ var send = sendmap[topic] | ||
make_send(spec, topic, function (err, send) { | ||
make_send(spec, topic, function(err, send) { | ||
if (err) { | ||
@@ -439,6 +484,6 @@ return done(err) | ||
internals.Utils.prototype.resolve_topic = function (opts, spec, args) { | ||
internals.Utils.prototype.resolve_topic = function(opts, spec, args) { | ||
var self = this | ||
if (!spec.pin) { | ||
return function () { | ||
return function() { | ||
return self._msgprefix + 'any' | ||
@@ -451,3 +496,3 @@ } | ||
var topicargs = {} | ||
_.each(topicpin, function (v, k) { | ||
_.each(topicpin, function(v, k) { | ||
topicargs[k] = args[k] | ||
@@ -457,3 +502,3 @@ }) | ||
var sb = [] | ||
_.each(_.keys(topicargs).sort(), function (k) { | ||
_.each(_.keys(topicargs).sort(), function(k) { | ||
sb.push(k) | ||
@@ -465,7 +510,12 @@ sb.push('=') | ||
var topic = this._msgprefix + (sb.join('')).replace(/[^\w\d]+/g, '_') | ||
var topic = this._msgprefix + sb.join('').replace(/[^\w\d]+/g, '_') | ||
return topic | ||
} | ||
internals.Utils.prototype.listen_topics = function (seneca, args, listen_options, do_topic) { | ||
internals.Utils.prototype.listen_topics = function( | ||
seneca, | ||
args, | ||
listen_options, | ||
do_topic | ||
) { | ||
var self = this | ||
@@ -477,5 +527,5 @@ var topics = [] | ||
if (pins) { | ||
_.each(this._context.seneca.findpins(pins), function (pin) { | ||
_.each(this._context.seneca.findpins(pins), function(pin) { | ||
var sb = [] | ||
_.each(_.keys(pin).sort(), function (k) { | ||
_.each(_.keys(pin).sort(), function(k) { | ||
sb.push(k) | ||
@@ -487,3 +537,3 @@ sb.push('=') | ||
var topic = self._msgprefix + (sb.join('')).replace(/[^\w\d]+/g, '_') | ||
var topic = self._msgprefix + sb.join('').replace(/[^\w\d]+/g, '_') | ||
@@ -495,4 +545,3 @@ topics.push(topic) | ||
// otherwise no listener established and seneca ends without msg | ||
} | ||
else { | ||
} else { | ||
topics.push(this._msgprefix + 'any') | ||
@@ -502,3 +551,3 @@ } | ||
if (typeof do_topic === 'function') { | ||
topics.forEach(function (topic) { | ||
topics.forEach(function(topic) { | ||
do_topic(topic) | ||
@@ -511,3 +560,3 @@ }) | ||
internals.Utils.prototype.update_output = function (input, output, err, out) { | ||
internals.Utils.prototype.update_output = function(input, output, err, out) { | ||
output.res = out | ||
@@ -527,3 +576,9 @@ | ||
internals.Utils.prototype.catch_act_error = function (seneca, e, listen_options, input, output) { | ||
internals.Utils.prototype.catch_act_error = function( | ||
seneca, | ||
e, | ||
listen_options, | ||
input, | ||
output | ||
) { | ||
seneca.log.error('listen', 'act-error', listen_options, e.stack || e) | ||
@@ -534,8 +589,6 @@ output.error = e | ||
// legacy names | ||
// legacy names | ||
internals.Utils.prototype.resolvetopic = internals.Utils.prototype.resolve_topic | ||
internals.Utils.prototype.prepareResponse = function (seneca, input) { | ||
internals.Utils.prototype.prepareResponse = function(seneca, input) { | ||
return { | ||
@@ -555,3 +608,2 @@ id: input.id, | ||
// Utilities | ||
@@ -562,3 +614,3 @@ | ||
// allow user to specify operations on result | ||
internals.Utils.prototype.handle_entity = function (seneca, raw) { | ||
internals.Utils.prototype.handle_entity = function(seneca, raw) { | ||
if (!raw) { | ||
@@ -574,3 +626,3 @@ return raw | ||
_.each(raw, function (value, key) { | ||
_.each(raw, function(value, key) { | ||
if (_.isObject(value) && value.entity$) { | ||
@@ -584,8 +636,7 @@ raw[key] = seneca.make$(value) | ||
internals.Utils.prototype.close = function (seneca, closer) { | ||
seneca.add('role:seneca,cmd:close', function (close_args, done) { | ||
internals.Utils.prototype.close = function(seneca, closer) { | ||
seneca.add('role:seneca,cmd:close', function(close_args, done) { | ||
var seneca = this | ||
closer.call(seneca, function (err) { | ||
closer.call(seneca, function(err) { | ||
if (err) { | ||
@@ -600,4 +651,3 @@ seneca.log.error(err) | ||
internals.Utils.prototype.stringifyJSON = function (seneca, note, obj) { | ||
internals.Utils.prototype.stringifyJSON = function(seneca, note, obj) { | ||
if (!obj) { | ||
@@ -609,4 +659,3 @@ return | ||
return JSON.stringify(obj) | ||
} | ||
catch (e) { | ||
} catch (e) { | ||
seneca.log.warn('json-stringify', note, obj, e.message) | ||
@@ -616,4 +665,3 @@ } | ||
internals.Utils.prototype.parseJSON = function (seneca, note, str) { | ||
internals.Utils.prototype.parseJSON = function(seneca, note, str) { | ||
if (!str) { | ||
@@ -625,5 +673,9 @@ return | ||
return JSON.parse(str) | ||
} | ||
catch (e) { | ||
seneca.log.warn('json-parse', note, str.replace(/[\r\n\t]+/g, ''), e.message) | ||
} catch (e) { | ||
seneca.log.warn( | ||
'json-parse', | ||
note, | ||
str.replace(/[\r\n\t]+/g, ''), | ||
e.message | ||
) | ||
e.input = str | ||
@@ -634,4 +686,3 @@ return e | ||
internals.Utils.prototype.resolveDynamicValue = function (value, options) { | ||
internals.Utils.prototype.resolveDynamicValue = function(value, options) { | ||
if (_.isFunction(value)) { | ||
@@ -638,0 +689,0 @@ return value(options) |
{ | ||
"name": "seneca-transport", | ||
"version": "2.4.0", | ||
"version": "3.0.0", | ||
"description": "Seneca transport", | ||
@@ -24,4 +24,4 @@ "main": "transport.js", | ||
"clean-npm": "rm -rf node_modules package-lock.json", | ||
"repo-tag": "REPO_VERSION=`node -e \"console.log(require('./package').version)\"`; echo TAG: v$REPO_VERSION; git commit -a -m v$REPO_VERSION; git tag v$REPO_VERSION; git push --tags;", | ||
"repo-publish": "npm run repo-tag; npm publish --access public" | ||
"repo-tag": "REPO_VERSION=`node -e \"console.log(require('./package').version)\"` && echo TAG: v$REPO_VERSION && git commit -a -m v$REPO_VERSION && git push && git tag v$REPO_VERSION && git push --tags;", | ||
"repo-publish": "npm run prettier && npm test && npm run repo-tag && npm publish --access public" | ||
}, | ||
@@ -49,24 +49,25 @@ "contributors": [ | ||
"dependencies": { | ||
"eraro": "1.1", | ||
"gex": "0.3", | ||
"jsonic": "0.3", | ||
"lodash": "4", | ||
"lru-cache": "4.1", | ||
"ndjson": "1.5", | ||
"nid": "0.3", | ||
"patrun": "1.0", | ||
"qs": "6.5", | ||
"reconnect-core": "1.3", | ||
"wreck": "12" | ||
"eraro": "^1.1.0", | ||
"gex": "^0.3.0", | ||
"jsonic": "^0.3.1", | ||
"lodash": "^4.17.11", | ||
"lru-cache": "^4.1.5", | ||
"ndjson": "^1.5.0", | ||
"nid": "^0.3.2", | ||
"patrun": "^1.0.0", | ||
"qs": "^6.5.2", | ||
"reconnect-core": "^1.3.0", | ||
"wreck": "^12.5.1" | ||
}, | ||
"devDependencies": { | ||
"async": "2", | ||
"bench": "0", | ||
"code": "4", | ||
"coveralls": "3", | ||
"lab": "14", | ||
"seneca": "plugin", | ||
"seneca-entity": "2.3", | ||
"seneca-transport-test": "0", | ||
"sinon": "5" | ||
"async": "^2.6.2", | ||
"bench": "^0.3.6", | ||
"code": "^4.1.0", | ||
"coveralls": "^3.0.2", | ||
"lab": "^14.3.4", | ||
"prettier": "^1.16.4", | ||
"seneca": "senecajs/seneca", | ||
"seneca-entity": "^2.3.0", | ||
"seneca-transport-test": "^0.3.0", | ||
"sinon": "^5.1.1" | ||
}, | ||
@@ -73,0 +74,0 @@ "files": [ |
@@ -57,3 +57,3 @@ ![Seneca](http://senecajs.org/files/assets/seneca-logo.png) | ||
function _color_ to define the name of the plugin (see [How to write a | ||
Seneca plugin](http://senecajs.org/tutorials/how-to-write-a-plugin.html)). | ||
Seneca plugin](http://senecajs.org/docs/tutorials/how-to-write-a-plugin.html)). | ||
@@ -60,0 +60,0 @@ ```js |
@@ -51,3 +51,3 @@ /* Copyright (c) 2013-2015 Richard Rodger & other contributors, MIT License */ | ||
module.exports = function transport (options) { | ||
module.exports = function transport(options) { | ||
var seneca = this | ||
@@ -63,19 +63,46 @@ | ||
seneca.add({ role: internals.plugin, cmd: 'inflight' }, internals.inflight(callmap)) | ||
seneca.add( | ||
{ role: internals.plugin, cmd: 'inflight' }, | ||
internals.inflight(callmap) | ||
) | ||
seneca.add({ role: internals.plugin, cmd: 'listen' }, internals.listen) | ||
seneca.add({ role: internals.plugin, cmd: 'client' }, internals.client) | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'tcp' }, Tcp.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'tcp' }, Tcp.client(settings, transportUtil)) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'listen', type: 'tcp' }, | ||
Tcp.listen(settings, transportUtil) | ||
) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'client', type: 'tcp' }, | ||
Tcp.client(settings, transportUtil) | ||
) | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'web' }, Http.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'web' }, Http.client(settings, transportUtil)) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'listen', type: 'web' }, | ||
Http.listen(settings, transportUtil) | ||
) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'client', type: 'web' }, | ||
Http.client(settings, transportUtil) | ||
) | ||
// Aliases. | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'http' }, Http.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'http' }, Http.client(settings, transportUtil)) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'listen', type: 'http' }, | ||
Http.listen(settings, transportUtil) | ||
) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'client', type: 'http' }, | ||
Http.client(settings, transportUtil) | ||
) | ||
// Legacy API. | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'direct' }, Http.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'direct' }, Http.client(settings, transportUtil)) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'listen', type: 'direct' }, | ||
Http.listen(settings, transportUtil) | ||
) | ||
seneca.add( | ||
{ role: internals.plugin, hook: 'client', type: 'direct' }, | ||
Http.client(settings, transportUtil) | ||
) | ||
@@ -89,3 +116,3 @@ return { | ||
module.exports.preload = function () { | ||
module.exports.preload = function() { | ||
var seneca = this | ||
@@ -96,3 +123,3 @@ | ||
exportmap: { | ||
utils: function () { | ||
utils: function() { | ||
var transportUtil = seneca.export(internals.plugin).utils | ||
@@ -109,6 +136,6 @@ if (transportUtil !== meta.exportmap.utils) { | ||
internals.inflight = function (callmap) { | ||
return function (args, callback) { | ||
internals.inflight = function(callmap) { | ||
return function(args, callback) { | ||
var inflight = {} | ||
callmap.forEach(function (val, key) { | ||
callmap.forEach(function(val, key) { | ||
inflight[key] = val | ||
@@ -120,6 +147,9 @@ }) | ||
internals.listen = function (args, callback) { | ||
internals.listen = function(args, callback) { | ||
var seneca = this | ||
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'listen' }) | ||
var config = _.extend({}, args.config, { | ||
role: internals.plugin, | ||
hook: 'listen' | ||
}) | ||
var listen_args = seneca.util.clean(_.omit(config, 'cmd')) | ||
@@ -133,6 +163,9 @@ var legacyError = internals.legacyError(seneca, listen_args.type) | ||
internals.client = function (args, callback) { | ||
internals.client = function(args, callback) { | ||
var seneca = this | ||
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'client' }) | ||
var config = _.extend({}, args.config, { | ||
role: internals.plugin, | ||
hook: 'client' | ||
}) | ||
var client_args = seneca.util.clean(_.omit(config, 'cmd')) | ||
@@ -146,3 +179,3 @@ var legacyError = internals.legacyError(seneca, client_args.type) | ||
internals.legacyError = function (seneca, type) { | ||
internals.legacyError = function(seneca, type) { | ||
if (type === 'pubsub') { | ||
@@ -149,0 +182,0 @@ return seneca.fail('plugin-needed', { name: 'seneca-redis-transport' }) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
72391
1234
10
+ Addedcall-bind@1.0.7(transitive)
+ Addeddefine-data-property@1.1.4(transitive)
+ Addedes-define-property@1.0.0(transitive)
+ Addedes-errors@1.3.0(transitive)
+ Addedfunction-bind@1.1.2(transitive)
+ Addedget-intrinsic@1.2.4(transitive)
+ Addedgopd@1.0.1(transitive)
+ Addedhas-property-descriptors@1.0.2(transitive)
+ Addedhas-proto@1.0.3(transitive)
+ Addedhas-symbols@1.0.3(transitive)
+ Addedhasown@2.0.2(transitive)
+ Addedobject-inspect@1.13.3(transitive)
+ Addedqs@6.13.1(transitive)
+ Addedset-function-length@1.2.2(transitive)
+ Addedside-channel@1.0.6(transitive)
- Removedqs@6.5.3(transitive)
Updatederaro@^1.1.0
Updatedgex@^0.3.0
Updatedjsonic@^0.3.1
Updatedlodash@^4.17.11
Updatedlru-cache@^4.1.5
Updatedndjson@^1.5.0
Updatednid@^0.3.2
Updatedpatrun@^1.0.0
Updatedqs@^6.5.2
Updatedreconnect-core@^1.3.0
Updatedwreck@^12.5.1