New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

seneca-transport

Package Overview
Dependencies
Maintainers
2
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

seneca-transport - npm Package Compare versions

Comparing version 0.7.2 to 0.8.0

LICENSE

865

lib/transport-utils.js
/* Copyright (c) 2015 Richard Rodger, MIT License */
/* jshint node:true, asi:true, eqnull:true */
"use strict";
'use strict'
// Load modules
var Util = require('util')
var _ = require('lodash')
var Nid = require('nid')
var Patrun = require('patrun')
var Gex = require('gex')
var Jsonic = require('jsonic')
var Eraro = require('eraro')
var util = require('util')
// Declare internals
var internals = {
error: Eraro({
package: 'seneca',
msgmap: {
'no_data': 'The message has no data.',
'invalid_kind_act': 'Inbound messages should have kind "act", kind was: <%=kind%>.',
'no_message_id': 'The message has no identifier.',
'invalid_origin': 'The message response is not for this instance, origin was <%=origin%>.',
'unknown_message_id': 'The message has an unknown identifier',
'own_message': 'Inbound message rejected as originated from this server.',
'message_loop': 'Inbound message rejected as looping back to this server.',
'data_error': 'Inbound message included an error description.',
'invalid_json': 'Invalid JSON: <%=input%>.'
},
override: true
})
}
module.exports = internals.Utils = function (context) {
this._msgprefix = (!context.options.msgprefix ? '' : context.options.msgprefix)
this._context = context
}
var _ = require('lodash')
var nid = require('nid')
var patrun = require('patrun')
var gex = require('gex')
var jsonic = require('jsonic')
internals.Utils.prototype.handle_response = function (seneca, data, client_options) {
data.time = data.time || {}
data.time.client_recv = Date.now()
var error = require('eraro')({
package: 'seneca',
msgmap: ERRMSGMAP(),
override: true
})
if (data.kind !== 'res') {
if (this._context.options.warn.invalid_kind) {
seneca.log.warn('client', 'invalid_kind_res', client_options, data)
}
return false
}
if (data.id === null) {
if (this._context.options.warn.no_message_id) {
seneca.log.warn('client', 'no_message_id', client_options, data)
}
return false
}
if (seneca.id !== data.origin) {
if (this._context.options.warn.invalid_origin) {
seneca.log.warn('client', 'invalid_origin', client_options, data)
}
return false
}
module.exports = function( ctxt ) {
var msgprefix = (null == ctxt.options.msgprefix ? '' : ctxt.options.msgprefix)
var callmeta = this._context.callmap.get(data.id)
var tu = {
if (callmeta) {
this._context.callmap.del(data.id)
}
else {
if (this._context.options.warn.unknown_message_id) {
seneca.log.warn('client', 'unknown_message_id', client_options, data)
}
return false
}
prepare_response: function prepare_response( seneca, input ) {
return {
id: input.id,
kind: 'res',
origin: input.origin,
accept: seneca.id,
track: input.track,
time: {
client_sent: (input.time && input.time.client_sent) || 0,
listen_recv: Date.now()
},
}
},
var err = null
var result = null
if (data.error) {
err = new Error(data.error.message)
handle_response: function handle_response( seneca, data, client_options ) {
data.time = data.time || {}
data.time.client_recv = Date.now()
_.each(data.error, function (value, key) {
err[key] = value
})
}
else {
result = this.handle_entity(data.res)
}
if( 'res' !== data.kind ) {
if( ctxt.options.warn.invalid_kind ) {
seneca.log.warn('client', 'invalid_kind_res', client_options, data)
}
return false
}
var actinfo = {
id: data.id,
accept: data.accept,
track: data.track,
time: data.time
}
if( null == data.id ) {
if( ctxt.options.warn.no_message_id ) {
seneca.log.warn('client', 'no_message_id', client_options, data);
}
return false;
}
this.callmeta({
callmeta: callmeta,
err: err,
result: result,
actinfo: actinfo,
seneca: seneca,
client_options: client_options,
data: data
})
if( seneca.id !== data.origin ) {
if( ctxt.options.warn.invalid_origin ) {
seneca.log.warn('client', 'invalid_origin', client_options, data)
}
return false
}
return true
}
internals.Utils.prototype.callmeta = function (options) {
try {
options.callmeta.done(options.err, options.result, options.actinfo)
}
catch (e) {
options.seneca.log.error('client', 'callback_error', options.client_options, options.data, e.stack || e)
}
}
var callmeta = ctxt.callmap.get(data.id)
internals.Utils.prototype.prepare_request = function (seneca, args, done) {
var callmeta = {
args: args,
done: _.bind(done, seneca),
when: Date.now()
}
if( callmeta ) {
ctxt.callmap.del( data.id )
}
else {
if( ctxt.options.warn.unknown_message_id ) {
seneca.log.warn('client', 'unknown_message_id', client_options, data);
}
return false;
}
this._context.callmap.set(args.meta$.id, callmeta)
var err = null
var result = null
var track = []
if (args.transport$) {
track = _.clone((args.transport$.track || []))
}
track.push(seneca.id)
if( data.error ) {
err = new Error( data.error.message )
var output = {
id: args.meta$.id,
kind: 'act',
origin: seneca.id,
track: track,
time: { client_sent: Date.now() },
act: seneca.util.clean(args)
}
_.each(data.error,function(v,k){
err[k] = v
})
}
else {
result = tu.handle_entity(data.res)
}
var actinfo = {
id: data.id,
accept: data.accept,
track: data.track,
time: data.time
}
return output
}
try {
callmeta.done( err, result, actinfo )
}
catch(e) {
seneca.log.error(
'client', 'callback_error', client_options, data, e.stack||e)
}
internals.Utils.prototype.handle_request = function (seneca, data, listen_options, respond) {
if (!data) {
return respond({ input: data, error: internals.error('no_data') })
}
return true;
},
if (data.kind !== 'act') {
if (this._context.options.warn.invalid_kind) {
seneca.log.warn('listen', 'invalid_kind_act', listen_options, data)
}
return respond({
input: data,
error: internals.error('invalid_kind_act', { kind: data.kind })
})
}
if (data.id === null) {
if (this._context.options.warn.no_message_id) {
seneca.log.warn('listen', 'no_message_id', listen_options, data)
}
return respond({ input: data, error: internals.error('no_message_id') })
}
prepare_request: function prepare_request( seneca, args, done ) {
var callmeta = {
args: args,
done: _.bind(done,seneca),
when: Date.now()
if (this._context.options.check.own_message && this._context.callmap.has(data.id)) {
if (this._context.options.warn.own_message) {
seneca.log.warn('listen', 'own_message', listen_options, data)
}
return respond({ input: data, error: internals.error('own_message') })
}
if (this._context.options.check.message_loop && Array.isArray(data.track)) {
for (var i = 0; i < data.track.length; i++) {
if (seneca.id === data.track[i]) {
if (this._context.options.warn.message_loop) {
seneca.log.warn('listen', 'message_loop', listen_options, data)
}
return respond({ input: data, error: internals.error('message_loop') })
}
}
}
ctxt.callmap.set(args.meta$.id,callmeta)
if (data.error) {
seneca.log.error('listen', 'data_error', listen_options, data)
return respond({ input: data, error: internals.error('data_error') })
}
var track = []
if( args.transport$ ) {
track = _.clone((args.transport$.track||[]))
}
track.push(seneca.id)
var output = this.prepareResponse(seneca, data)
var input = this.handle_entity(data.act)
var output = {
id: args.meta$.id,
kind: 'act',
origin: seneca.id,
track: track,
time: { client_sent: Date.now() },
act: seneca.util.clean(args),
}
input.transport$ = {
track: data.track || [],
origin: data.origin,
time: data.time
}
return output;
},
input.id$ = data.id
this.requestAct(seneca, input, output, respond)
}
handle_request: function handle_request(
seneca, data, listen_options, respond
) {
if( null == data ) return respond({input:data,error:error('no_data')});
internals.Utils.prototype.requestAct = function (seneca, input, output, respond) {
var self = this
if( 'act' != data.kind ) {
if( ctxt.options.warn.invalid_kind ) {
seneca.log.warn('listen', 'invalid_kind_act', listen_options, data)
}
return respond({
input:data,
error:error('invalid_kind_act',{kind:data.kind})
});
}
try {
seneca.act(input, function (err, out) {
self.update_output(input, output, err, out)
respond(output)
})
}
catch (e) {
self.catch_act_error(seneca, e, input, {}, output)
respond(output)
}
}
if( null == data.id ) {
if( ctxt.options.warn.no_message_id ) {
seneca.log.warn('listen', 'no_message_id', listen_options, data)
}
return respond({input:data,error:error('no_message_id')});
}
internals.Utils.prototype.make_client = function (context_seneca, make_send, client_options, client_done) {
var instance = this._context.seneca
if( ctxt.options.check.own_message && ctxt.callmap.has(data.id) ) {
if( ctxt.options.warn.own_message ) {
seneca.log.warn('listen', 'own_message', listen_options, data)
}
return respond({input:data,error:error('own_message')});
}
// legacy api
if (!context_seneca.seneca) {
client_done = client_options
client_options = make_send
make_send = context_seneca
}
else {
instance = context_seneca
}
if( ctxt.options.check.message_loop && _.isArray(data.track) ) {
for( var i = 0; i < data.track.length; i++ ) {
if( seneca.id === data.track[i] ) {
if( ctxt.options.warn.message_loop ) {
seneca.log.warn('listen', 'message_loop', listen_options, data)
}
return respond({input:data,error:error('message_loop')});
}
}
}
var pins = this.resolve_pins(client_options)
instance.log.debug('client', client_options, pins || 'any')
if( data.error ) {
seneca.log.error('listen', 'data_error', listen_options, data )
return respond({input:data,error:error('data_error')});
}
var finish = function (err, send) {
if (err) {
return client_done(err)
}
client_done(null, send)
}
var output = tu.prepare_response( seneca, data )
var input = tu.handle_entity( data.act )
if (pins) {
var argspatrun = this.make_argspatrun(pins)
var resolvesend = this.make_resolvesend(client_options, {}, make_send)
input.transport$ = {
track: data.track || [],
origin: data.origin,
time: data.time
}
return this.make_pinclient(resolvesend, argspatrun, finish)
}
input.id$ = data.id
this.make_anyclient(client_options, make_send, finish)
}
try {
seneca.act( input, function( err, out ) {
tu.update_output(input,output,err,out)
respond(output)
})
}
catch(e) {
tu.catch_act_error( seneca, e, listen_options, data, output )
respond(output)
}
},
internals.Utils.prototype.make_anyclient = function (opts, make_send, done) {
var self = this
make_send({}, this._msgprefix + 'any', function (err, send) {
if (err) {
return done(err)
}
if (typeof send !== 'function') {
return done(self._context.seneca.fail('null-client', { opts: opts }))
}
var client = {
id: Nid(),
toString: function () { return 'any-' + this.id },
make_client: function make_client(
context_seneca,
make_send,
client_options,
client_done
) {
var instance = ctxt.seneca
// TODO: is this used?
match: function (args) {
return !this.has(args)
},
// legacy api
if( !context_seneca.seneca ) {
client_done = client_options
client_options = make_send
make_send = context_seneca
send: function (args, done) {
send.call(this, args, done)
}
else {
instance = context_seneca
}
}
var pins = tu.resolve_pins( client_options )
instance.log.debug( 'client', client_options, pins||'any' )
done(null, client)
})
}
if( pins ) {
var argspatrun = tu.make_argspatrun( pins )
var resolvesend = tu.make_resolvesend( client_options, {}, make_send )
internals.Utils.prototype.make_pinclient = function (resolvesend, argspatrun, done) {
var client = {
id: Nid(),
toString: function () {
return 'pin-' + argspatrun.mark + '-' + this.id
},
tu.make_pinclient( resolvesend, argspatrun, function( err, send ) {
if( err ) return client_done(err);
client_done( null, send )
})
}
else {
tu.make_anyclient( client_options, make_send, function( err, send ) {
if( err ) return client_done(err);
client_done( null, send )
})
}
// TODO: is this used?
match: function (args) {
var match = !!argspatrun.find(args)
return match
},
make_anyclient: function make_anyclient( opts, make_send, done ) {
make_send( {}, msgprefix+'any', function( err, send ) {
if( err ) return done(err);
if( !_.isFunction(send) )
return done(ctxt.seneca.fail('null-client',{opts:opts}));
var client = {
id: nid(),
toString: function(){ return 'any-'+this.id },
// TODO: is this used?
match: function( args ) {
return !this.has(args)
},
send: function( args, done ) {
send.call(this,args,done)
}
send: function (args, done) {
var seneca = this
var spec = argspatrun.find(args)
resolvesend(spec, args, function (err, send) {
if (err) {
return done(err)
}
done( null, client )
send.call(seneca, args, done)
})
},
}
}
done(null, client)
}
make_pinclient: function make_pinclient( resolvesend, argspatrun, done ) {
var client = {
id: nid(),
toString: function(){ return 'pin-'+argspatrun.mark+'-'+this.id },
internals.Utils.prototype.resolve_pins = function (opts) {
var pins = opts.pin || opts.pins
if (pins) {
pins = Array.isArray(pins) ? pins : [pins]
}
// TODO: is this used?
match: function( args ) {
var match = !!argspatrun.find(args)
return match
},
if (pins) {
pins = _.map(pins, function (pin) {
return (typeof pin === 'string') ? Jsonic(pin) : pin
})
}
send: function( args, done ) {
var seneca = this
var spec = argspatrun.find(args)
resolvesend(spec,args,function(err, send){
if( err ) return done(err);
send.call(seneca,args,done)
})
}
return pins
}
internals.Utils.prototype.make_argspatrun = function (pins) {
var argspatrun = Patrun(function (pat, data) {
var gexers = {}
_.each(pat, function (val, key) {
if ((typeof val === 'string') && ~val.indexOf('*')) {
delete pat[key]
gexers[key] = Gex(val)
}
})
done( null, client )
},
// handle previous patterns that match this pattern
var prev = this.list(pat)
var prevfind = prev[0] && prev[0].find
var prevdata = prev[0] && this.findexact(prev[0].match)
return function (args, data) {
var out = data
_.each(gexers, function (g, k) {
var v = (args[k] === null) ? '' : args[k]
if (g.on(v) === null) {
out = null
}
})
resolve_pins: function resolve_pins( opts ) {
var pins = opts.pin || opts.pins
if( pins ) {
pins = _.isArray(pins) ? pins : [pins]
if (prevfind && out === null) {
out = prevfind.call(this, args, prevdata)
}
if( pins ) {
pins = _.map(pins,function(pin){
return _.isString(pin) ? jsonic(pin) : pin
})
}
return out
}
})
return pins
},
_.each(pins, function (pin) {
var spec = { pin: pin }
argspatrun.add(pin, spec)
})
argspatrun.mark = Util.inspect(pins).replace(/\s+/g, '').replace(/\n/g, '')
// can handle glob expressions :)
make_argspatrun: function make_argspatrun( pins ) {
var argspatrun = patrun(function(pat,data) {
var gexers = {}
_.each(pat, function(v,k) {
if( _.isString(v) && ~v.indexOf('*') ) {
delete pat[k]
gexers[k] = gex(v)
}
})
return argspatrun
}
// handle previous patterns that match this pattern
var prev = this.list(pat)
var prevfind = prev[0] && prev[0].find
var prevdata = prev[0] && this.findexact(prev[0].match)
internals.Utils.prototype.make_resolvesend = function (opts, sendmap, make_send) {
var self = this
return function (spec, args, done) {
var topic = self.resolve_topic(opts, spec, args)
var send = sendmap[topic]
if (send) {
return done(null, send)
}
return function(args,data) {
var out = data
_.each(gexers,function(g,k) {
var v = null==args[k]?'':args[k]
if( null == g.on( v ) ) { out = null }
})
make_send(spec, topic, function (err, send) {
if (err) {
return done(err)
}
sendmap[topic] = send
done(null, send)
})
}
}
if( prevfind && null == out ) {
out = prevfind.call(this,args,prevdata)
}
internals.Utils.prototype.resolve_topic = function (opts, spec, args) {
var self = this
if (!spec.pin) {
return function () {
return self._msgprefix + 'any'
}
}
return out
}
})
var topicpin = _.clone(spec.pin)
_.each( pins, function( pin ) {
var spec = { pin:pin }
argspatrun.add(pin,spec)
})
var topicargs = {}
_.each(topicpin, function (v, k) {
topicargs[k] = args[k]
})
argspatrun.mark = util.inspect(pins).replace(/\s+/g, '').replace(/\n/g, '')
var sb = []
_.each(_.keys(topicargs).sort(), function (k) {
sb.push(k)
sb.push('=')
sb.push(topicargs[k])
sb.push(',')
})
return argspatrun
},
var topic = this._msgprefix + (sb.join('')).replace(/[^\w\d]+/g, '_')
return topic
}
internals.Utils.prototype.listen_topics = function (seneca, args, listen_options, do_topic) {
var self = this
var topics = []
make_resolvesend: function make_resolvesend( opts, sendmap, make_send ) {
return function( spec, args, done ) {
var topic = tu.resolve_topic(opts,spec,args)
var send = sendmap[topic]
if( send ) return done(null,send);
var pins = this.resolve_pins(args)
make_send(spec,topic,function(err,send){
if( err ) return done(err)
sendmap[topic] = send
done(null,send)
})
}
},
resolve_topic: function resolve_topic( opts, spec, args ) {
if( !spec.pin ) return function() { return msgprefix+'any' }
var topicpin = _.clone(spec.pin)
var topicargs = {}
_.each(topicpin, function(v,k) { topicargs[k]=args[k] })
if (pins) {
_.each(this._context.seneca.findpins(pins), function (pin) {
var sb = []
_.each( _.keys(topicargs).sort(), function(k){
_.each(_.keys(pin).sort(), function (k) {
sb.push(k)
sb.push('=')
sb.push(topicargs[k])
sb.push(pin[k])
sb.push(',')
})
var topic = msgprefix+(sb.join('')).replace(/[^\w\d]+/g,'_')
return topic;
},
var topic = self._msgprefix + (sb.join('')).replace(/[^\w\d]+/g, '_')
topics.push(topic)
})
listen_topics: function listen_topics( seneca, args, listen_options, do_topic ) {
var topics = []
// TODO: die if no pins!!!
// otherwise no listener established and seneca ends without msg
}
else {
topics.push(this._msgprefix + 'any')
}
var pins = tu.resolve_pins( args )
if (typeof do_topic === 'function') {
topics.forEach(function (topic) {
do_topic(topic)
})
}
if( pins ) {
_.each( ctxt.seneca.findpins( pins ), function(pin) {
return topics
}
var sb = []
_.each( _.keys(pin).sort(), function(k){
sb.push(k)
sb.push('=')
sb.push(pin[k])
sb.push(',')
})
internals.Utils.prototype.update_output = function (input, output, err, out) {
output.res = out
var topic = msgprefix+(sb.join('')).replace(/[^\w\d]+/g,'_')
if (err) {
var errobj = _.extend({}, err)
errobj.message = err.message
errobj.name = err.name || 'Error'
output.error = errobj
output.input = input
}
topics.push(topic)
})
output.time.listen_sent = Date.now()
}
// TODO: die if no pins!!!
// otherwise no listener established and seneca ends without msg
}
else {
topics.push( msgprefix+'any' )
}
internals.Utils.prototype.catch_act_error = function (seneca, e, listen_options, input, output) {
seneca.log.error('listen', 'act-error', listen_options, e.stack || e)
output.error = e
output.input = input
}
if( _.isFunction( do_topic ) ) {
topics.forEach( function(topic){
do_topic( topic )
})
}
return topics;
},
update_output: function update_output( input, output, err, out ) {
output.res = out
if( err ) {
var errobj = _.extend({},err)
errobj.message = err.message
errobj.name = err.name || 'Error'
output.error = errobj
output.input = input
}
output.time.listen_sent = Date.now()
},
catch_act_error: function catch_act_error(
seneca, e, listen_options, input, output
) {
seneca.log.error('listen', 'act-error', listen_options, e.stack || e )
output.error = e
output.input = input
},
// only support first level
// interim measure - deal with this in core seneca act api
// allow user to specify operations on result
handle_entity: function handle_entity( raw ) {
if( null == raw ) return raw;
internals.Utils.prototype.handle_entity = function (raw) {
var self = this
if (!raw) {
return raw
}
raw = _.isObject( raw ) ? raw : {}
if( raw.entity$ ) {
return ctxt.seneca.make$( raw )
}
else {
_.each( raw, function(v,k) {
if( _.isObject(v) && v.entity$ ) {
raw[k] = ctxt.seneca.make$( v )
}
})
return raw
}
},
raw = _.isObject(raw) ? raw : {}
if (raw.entity$) {
return this._context.seneca.make$(raw)
}
close: function close( seneca, closer ) {
seneca.add('role:seneca,cmd:close',function( close_args, done ) {
var seneca = this
_.each(raw, function (value, key) {
if (_.isObject(value) && value.entity$) {
raw[key] = self._context.seneca.make$(value)
}
})
return raw
}
closer.call( seneca, function( err ) {
if( err ) seneca.log.error(err);
// legacy names
internals.Utils.prototype.resolvetopic = internals.Utils.prototype.resolve_topic
seneca.prior(close_args,done)
})
})
},
internals.Utils.prototype.close = function (seneca, closer) {
seneca.add('role:seneca,cmd:close', function (close_args, done) {
var seneca = this
parseJSON: function parseJSON( seneca, note, str ) {
if( str ) {
try {
return JSON.parse( str )
}
catch( e ) {
seneca.log.warn( 'json-parse', note, str.replace(/[\r\n\t]+/g,''), e.message )
e.input = str
return e;
}
closer.call(seneca, function (err) {
if (err) {
seneca.log.error(err)
}
},
seneca.prior(close_args, done)
})
})
}
stringifyJSON: function stringifyJSON( seneca, note, obj ) {
if( obj ) {
try {
return JSON.stringify( obj )
}
catch( e ) {
seneca.log.warn( 'json-stringify', note, obj, e.message )
}
}
},
internals.Utils.prototype.stringifyJSON = function (seneca, note, obj) {
if (!obj) {
return
}
try {
return JSON.stringify(obj)
}
catch (e) {
seneca.log.warn('json-stringify', note, obj, e.message)
}
}
internals.Utils.prototype.parseJSON = function (seneca, note, str) {
if (!str) {
return
}
// legacy names
tu.resolvetopic = tu.resolve_topic
return tu;
try {
return JSON.parse(str)
}
catch (e) {
seneca.log.warn('json-parse', note, str.replace(/[\r\n\t]+/g, ''), e.message)
e.input = str
return e
}
}
// Error code messages.
function ERRMSGMAP() {
internals.Utils.prototype.prepareResponse = function (seneca, input) {
return {
'no_data':'The message has no data.',
'invalid_kind_act':'Inbound messages should have kind "act", kind was: <%=kind%>.',
'no_message_id':'The message has no identifier.',
'invalid_origin':'The message response is not for this instance, origin was <%=origin%>.',
'unknown_message_id':'The message has an unknown identifier',
'own_message':'Inbound message rejected as originated from this server.',
'message_loop':'Inbound message rejected as looping back to this server.',
'data_error':'Inbound message included an error description.',
id: input.id,
kind: 'res',
origin: input.origin,
accept: seneca.id,
track: input.track,
time: {
client_sent: (input.time && input.time.client_sent) || 0,
listen_recv: Date.now()
}
}
}
internals.Utils.prototype.error = internals.error
{
"name": "seneca-transport",
"version": "0.7.2",
"version": "0.8.0",
"description": "Seneca transport",
"main": "transport.js",
"scripts": {
"test": "./test.sh",
"build": "./build.sh"
"test": "./node_modules/.bin/lab test/*.test.js -v -m 3000 -t 80 -L",
"build": "./build.sh",
"bench": "node bench",
"annotate": "docco transport.js -o doc"
},

@@ -22,15 +24,13 @@ "repository": {

"dependencies": {
"async": "0.9.2",
"connect": "3.0.2",
"connect-query": "0.2.0",
"connect-timeout": "1.5.0",
"eraro": "0.4.1",
"gex": "0.2.0",
"jsonic": "0.2.0",
"lodash": "2.4.2",
"lru-cache": "2.6.4",
"needle": "0.9.2",
"eraro": "0.4.x",
"gex": "0.2.x",
"jsonic": "0.2.x",
"lodash": "3.10.x",
"lru-cache": "2.7.x",
"ndjson": "1.4.x",
"nid": "0.3.2",
"patrun": "0.4.3",
"reconnect-net": "0.0.0"
"patrun": "0.5.x",
"qs": "5.2.0",
"reconnect-core": "1.1.x",
"wreck": "6.3.x"
},

@@ -44,8 +44,13 @@ "files": [

"devDependencies": {
"async": "1.5.x",
"bench": "0.3.x",
"code": "1.x.x",
"docco": "0.7.x",
"eslint-config-seneca": "1.x.x",
"eslint-plugin-hapi": "2.x.x",
"eslint-plugin-standard": "1.x.x",
"lab": "6.x.x",
"seneca": "plugin",
"docco": "0.7.0",
"jshint": "2.8.0",
"mocha": "2.1.0",
"seneca-transport-test": "0.1.3"
"seneca-transport-test": "0.2.x"
}
}

@@ -1,5 +0,7 @@

seneca-transport - a [Seneca](http://senecajs.org) plugin
=========================================================
![Seneca](http://senecajs.org/files/assets/seneca-logo.png)
> A [Seneca.js][] transport plugin
## Seneca Transport Plugin
# seneca-transport
[![Build Status][travis-badge]][travis-url]
[![Gitter][gitter-badge]][gitter-url]

@@ -11,28 +13,22 @@ This plugin provides the HTTP and TCP transport channels for

For a gentle introduction to Seneca itself, see the
[senecajs.org](http://senecajs.org) site.
- __Version:__ 0.2.3
- __Tested on:__ Seneca 0.7
- __Node:__ 0.10, 0.12, 4
seneca-transport's source can be read in an annotated fashion by,
- running `npm run annotate`
- viewing [./doc/transport.html]() locally
## Support
Current Version: 0.7.2
Tested on: [Seneca](//github.com/rjrodger/seneca) 0.6.3
[![Build Status](https://travis-ci.org/rjrodger/seneca-transport.png?branch=master)](https://travis-ci.org/rjrodger/seneca-transport)
Built and tested against versions: `0.10, 0.11, 0.12, iojs`
[Annotated Source](http://rjrodger.github.io/seneca-transport/doc/transport.html)
If you're using this module, and need help, you can:
* Post a [github issue](//github.com/rjrodger/seneca-transport/issues),
* Tweet to [@senecajs](http://twitter.com/senecajs),
* Ask on the [![Gitter chat](https://badges.gitter.im/rjrodger/seneca-transport.png)](https://gitter.im/rjrodger/seneca-transport).
- Post a [github issue][],
- Tweet to [@senecajs][],
- Ask on the [Gitter][gitter-url].
If you are new to Seneca in general, please take a look at [senecajs.org][]. We have everything from
tutorials to sample apps to help get you up and running quickly.
### Install
This plugin module is included in the main Seneca module:
This plugin module is included in the main Seneca module,

@@ -77,3 +73,3 @@ ```sh

var seneca = require('seneca')
seneca()

@@ -121,3 +117,3 @@ .use(color)

```sh
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT
[TIME] ly../..80/- DEBUG act color - IN 485n.. color:red {color=red} f2rv..

@@ -170,3 +166,3 @@ [TIME] ly../..80/- DEBUG act color - OUT 485n.. color:red {hex=#FF0000} f2rv..

[TIME] ly../..80/- DEBUG plugin color - ADD f2rv.. color:red
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT
[TIME] ly../..80/- DEBUG act color - IN 485n.. color:red {color=red} f2rv..

@@ -194,3 +190,3 @@ [TIME] ly../..80/- DEBUG act color - OUT 485n.. color:red {hex=#FF0000} f2rv..

var seneca = require('seneca')
seneca()

@@ -211,3 +207,3 @@ .use(color)

var seneca = require('seneca')
seneca()

@@ -248,3 +244,3 @@ .client()

> Content-Type: application/x-www-form-urlencoded
>
>
* upload completely sent off: 15 out of 15 bytes

@@ -264,3 +260,3 @@ < HTTP/1.1 200 OK

< Connection: keep-alive
<
<
* Connection #0 to host localhost left intact

@@ -376,3 +372,3 @@ {"hex":"#FF0000"}

[TIME] f1../..79/- INFO hello Seneca/0.5.20/f1../..79/-
[TIME] f1../..79/- DEBUG act - - IN wdfw.. color:red {color=red} CLIENT
[TIME] f1../..79/- DEBUG act - - IN wdfw.. color:red {color=red} CLIENT
[TIME] 6g../..49/- INFO plugin transport - ACT b01d.. listen open {type=tcp,host=0.0.0.0,port=10201,...}

@@ -647,6 +643,6 @@ [TIME] f1../..79/- INFO plugin transport - ACT nid1.. client {type=tcp,host=0.0.0.0,port=10201,...} any

* _time_:
* _client_sent_: client timestamp when message sent
* _listen_recv_: server timestamp when message received
* _listen_sent_: server timestamp when response sent
* _client_recv_: client timestamp when response received
* _client_sent_: client timestamp when message sent
* _listen_recv_: server timestamp when message received
* _listen_sent_: server timestamp when response sent
* _client_recv_: client timestamp when response received
* _act_: action message data, as submitted to Seneca

@@ -702,5 +698,5 @@ * _res_: response message data, as provided by Seneca

function make_send( spec, topic, send_done ) {
// setup topic in transport mechanism
// send the args over the transport

@@ -745,3 +741,3 @@ send_done( null, function( args, done ) {

if( null == out ) return ...;
// otherwise, send the result back

@@ -784,3 +780,3 @@ // don't forget to stringifyJSON(out) if necessary

The primary options are:
* _msgprefix_: a string to prefix to topic names so that they are namespaced

@@ -878,14 +874,2 @@ * _callmax_: the maximum number of in-flight request/response messages to cache

## Testing
This module itself does not contain any direct reference to Seneca, as
it is a Seneca dependency. However, Seneca is needed to test it, so
the test script will perform an _npm install seneca_ (if needed). This is not
saved to _package.json_.
```sh
npm test
```
## Releases

@@ -897,4 +881,32 @@

## Testing with Docker Compose
With docker-machine and docker-compose installed run the following commands:
```
docker-compose build
docker-compose up
```
The output will be the stdout from the server and client logs. You should also
see the client instance outputting the result from the server: `{ hex: '#FF0000' }`
## Contributing
The [Senecajs org][] encourage open participation. If you feel you can help in any way, be it with
documentation, examples, extra testing, or new features please get in touch.
## License
Copyright Richard Rodger and other contributors 2015, Licensed under [MIT][].
[travis-badge]: https://travis-ci.org/senecajs/seneca-transport.svg
[travis-url]: https://travis-ci.org/senecajs/seneca-transport
[gitter-badge]: https://badges.gitter.im/Join%20Chat.svg
[gitter-url]: https://gitter.im/senecajs/seneca
[MIT]: ./LICENSE
[Senecajs org]: https://github.com/senecajs/
[Seneca.js]: https://www.npmjs.com/package/seneca
[senecajs.org]: http://senecajs.org/
[leveldb]: http://leveldb.org/
[github issue]: https://github.com/senecajs/seneca-transport/issues
[@senecajs]: http://twitter.com/senecajs
/* Copyright (c) 2013-2015 Richard Rodger, MIT License */
/* jshint node:true, asi:true, eqnull:true */
"use strict";
'use strict'
// Load modules
var _ = require('lodash')
var LruCache = require('lru-cache')
var Tcp = require('./lib/tcp')
var TransportUtil = require('./lib/transport-utils.js')
var Http = require('./lib/http')
var buffer = require('buffer')
var util = require('util')
var net = require('net')
var stream = require('stream')
var _ = require('lodash')
var connect = require('connect')
var needle = require('needle')
var lrucache = require('lru-cache')
var reconnect = require('reconnect-net')
var timeout = require('connect-timeout')
var query = require('connect-query')
var jsonic = require('jsonic')
var error = require('eraro')({
package: 'seneca',
msgmap: ERRMSGMAP(),
override: true
})
var make_tu = require('./lib/transport-utils.js')
module.exports = function transport( options ) {
/* jshint validthis:true */
var seneca = this
var plugin = 'transport'
var so = seneca.options()
options = seneca.util.deepextend({
// Declare internals
var internals = {
defaults: {
msgprefix: 'seneca_',
callmax: 111111,
msgidlen: 12,
callmax: 111111,
msgidlen: 12,
warn: {
unknown_message_id: true,
invalid_kind: true,
invalid_origin: true,
no_message_id: true,
message_loop: true,
own_message: true,
invalid_kind: true,
invalid_origin: true,
no_message_id: true,
message_loop: true,
own_message: true
},
check: {

@@ -57,539 +30,96 @@ message_loop: true,

},
web: {
type: 'web',
port: 10101,
host: '0.0.0.0',
path: '/act',
type: 'web',
port: 10101,
host: '0.0.0.0',
path: '/act',
protocol: 'http',
timeout: Math.max( so.timeout ? so.timeout-555 : 5555, 555 )
timeout: 5555
},
tcp: {
type: 'tcp',
host: '0.0.0.0',
port: 10201,
timeout: Math.max( so.timeout ? so.timeout-555 : 5555, 555 )
},
type: 'tcp',
host: '0.0.0.0',
port: 10201,
timeout: 5555
}
},
plugin: 'transport'
}
},options)
module.exports = function (options) {
var seneca = this
var settings = seneca.util.deepextend(internals.defaults, options)
var callmap = LruCache(settings.callmax)
var transportUtil = new TransportUtil({
callmap: callmap,
seneca: seneca,
options: settings
})
seneca.add({ role: internals.plugin, cmd: 'inflight' }, internals.inflight(callmap))
seneca.add({ role: internals.plugin, cmd: 'listen' }, internals.listen)
seneca.add({ role: internals.plugin, cmd: 'client' }, internals.client)
// Pending callbacks for all transports.
var callmap = lrucache( options.callmax )
seneca.add({ role: internals.plugin, hook: 'listen', type: 'tcp' }, Tcp.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'tcp' }, Tcp.client(settings, transportUtil))
// Utility functions, bound to this transport context
var tu = make_tu( { callmap:callmap, seneca:seneca, options:options } )
seneca.add({ role: internals.plugin, hook: 'listen', type: 'web' }, Http.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'web' }, Http.client(settings, transportUtil))
seneca.add({role:plugin,cmd:'inflight'}, cmd_inflight)
seneca.add({role:plugin,cmd:'listen'}, cmd_listen)
seneca.add({role:plugin,cmd:'client'}, cmd_client)
seneca.add({role:plugin,hook:'listen',type:'tcp'}, hook_listen_tcp)
seneca.add({role:plugin,hook:'client',type:'tcp'}, hook_client_tcp)
seneca.add({role:plugin,hook:'listen',type:'web'}, hook_listen_web)
seneca.add({role:plugin,hook:'client',type:'web'}, hook_client_web)
// Aliases.
seneca.add({role:plugin,hook:'listen',type:'http'}, hook_listen_web)
seneca.add({role:plugin,hook:'client',type:'http'}, hook_client_web)
seneca.add({ role: internals.plugin, hook: 'listen', type: 'http' }, Http.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'http' }, Http.client(settings, transportUtil))
// Legacy API.
seneca.add({role:plugin,hook:'listen',type:'direct'}, hook_listen_web)
seneca.add({role:plugin,hook:'client',type:'direct'}, hook_client_web)
seneca.add({ role: internals.plugin, hook: 'listen', type: 'direct' }, Http.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'direct' }, Http.client(settings, transportUtil))
return {
name: internals.plugin,
exportmap: { utils: transportUtil },
options: settings
}
}
function cmd_inflight( args, done ) {
internals.inflight = function (callmap) {
return function (args, callback) {
var inflight = {}
callmap.forEach( function(v,k) {
inflight[k] = v
callmap.forEach(function (val, key) {
inflight[key] = val
})
done( null, inflight )
callback(null, inflight)
}
}
internals.listen = function (args, callback) {
var seneca = this
function cmd_listen( args, done ) {
var seneca = this
var listen_config = args.config
var listen_args =
seneca.util.clean(
_.omit(
_.extend({},listen_config,{role:plugin,hook:'listen'}),'cmd'))
if( handle_legacy_types(listen_args.type,done) ) {
seneca.act( listen_args, done )
}
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'listen' })
var listen_args = seneca.util.clean(_.omit(config, 'cmd'))
var legacyError = internals.legacyError(seneca, listen_args.type)
if (legacyError) {
return callback(legacyError)
}
seneca.act(listen_args, callback)
}
internals.client = function (args, callback) {
var seneca = this
function cmd_client( args, done ) {
var seneca = this
var client_config = args.config
var client_args =
seneca.util.clean(
_.omit(
_.extend({},client_config,{role:plugin,hook:'client'}),'cmd'))
if( handle_legacy_types(client_args.type,done) ) {
seneca.act( client_args, done )
}
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'client' })
var client_args = seneca.util.clean(_.omit(config, 'cmd'))
var legacyError = internals.legacyError(seneca, client_args.type)
if (legacyError) {
return callback(legacyError)
}
seneca.act(client_args, callback)
}
function handle_legacy_types(type,done) {
var ok = false
if( 'pubsub' == type ) {
done(seneca.fail('plugin-needed',{name:'seneca-redis-transport'}))
}
else if( 'queue' == type ) {
done(seneca.fail('plugin-needed',{name:'seneca-beanstalkd-transport'}))
}
else ok = true;
return ok;
internals.legacyError = function (seneca, type) {
if (type === 'pubsub') {
return seneca.fail('plugin-needed', { name: 'seneca-redis-transport' })
}
function hook_listen_tcp( args, done ) {
var seneca = this
var type = args.type
var listen_options = seneca.util.clean(_.extend({},options[type],args))
function make_msger() {
var msger = new stream.Duplex({objectMode:true})
msger._read = function() {}
msger._write = function( data, enc , done ) {
var stream_instance = this
if( util.isError(data) ) {
var out = tu.prepare_response(seneca,{})
out.input = data.input
out.error = error('invalid_json',{input:data.input})
stream_instance.push(out)
return done()
}
tu.handle_request( seneca, data, listen_options, function(out) {
if( null == out ) return done();
stream_instance.push(out)
return done();
})
}
return msger
}
var connections = []
var listen = net.createServer(function(connection) {
seneca.log.debug('listen', 'connection', listen_options,
'remote', connection.remoteAddress, connection.remotePort)
connection
.pipe(json_parser_stream())
.pipe(make_msger())
.pipe(json_stringify_stream())
.pipe(connection)
connection.on('error',function(err){
seneca.log.error('listen', 'pipe-error', listen_options, err.stack||err)
})
connections.push(connection)
})
listen.on('listening', function() {
seneca.log.debug('listen', 'open',
listen_options)
done()
})
listen.on('error', function(err) {
seneca.log.error('listen', 'net-error', listen_options, err.stack||err)
})
listen.on('close', function() {
seneca.log.debug('listen', 'close', listen_options)
})
listen.listen( listen_options.port, listen_options.host )
tu.close( seneca, function(done){
listen.close()
connections.forEach(function(con){
try { con.destroy() } catch(e) { seneca.log.error(e) }
})
done()
})
if (type === 'queue') {
return seneca.fail('plugin-needed', { name: 'seneca-beanstalkd-transport' })
}
function hook_client_tcp( args, clientdone ) {
var seneca = this
var type = args.type
var client_options = seneca.util.clean(_.extend({},options[type],args))
tu.make_client( seneca, make_send, client_options, clientdone )
function make_send( spec, topic, send_done ) {
seneca.log.debug('client', type, 'send-init',
spec, topic, client_options)
function make_msger() {
var msger = new stream.Duplex({objectMode:true})
msger._read = function() {}
msger._write = function( data, enc, done ) {
tu.handle_response( seneca, data, client_options )
return done();
}
return msger;
}
var msger = make_msger()
var connections = []
var clientconnect = reconnect( function(client) {
connections.push(client)
client
.pipe( json_parser_stream() )
.pipe( msger )
.pipe( json_stringify_stream() )
.pipe( client )
}).on('connect', function() {
seneca.log.debug('client', type, 'connect',
spec, topic, client_options)
}).on('reconnect', function() {
seneca.log.debug('client', type, 'reconnect',
spec, topic, client_options)
}).on('disconnect', function(err) {
seneca.log.debug('client', type, 'disconnect',
spec, topic, client_options,
(err&&err.stack)||err)
}).connect({
port: client_options.port,
host: client_options.host
})
send_done( null, function( args, done ) {
var outmsg = tu.prepare_request( this, args, done )
msger.push( outmsg )
})
tu.close( seneca, function( done ) {
clientconnect.disconnect()
connections.forEach(function(con){
try { con.destroy() } catch(e) { seneca.log.error(e) }
})
done()
})
}
}
function json_parser_stream() {
var json_parser = new stream.Duplex({objectMode:true})
json_parser.linebuf = []
json_parser._read = function() {}
json_parser._write = function(data,enc,done) {
var str = ''+data
var endline = -1
var remain = 0
while( -1 != (endline = str.indexOf('\n',remain)) ) {
this.linebuf.push( str.substring(remain,endline) )
var jsonstr = this.linebuf.join('')
this.linebuf.length = 0
remain = endline+1
if( '' === jsonstr ) {
return done();
}
var outdata = tu.parseJSON( seneca, 'stream', jsonstr )
if( outdata ) {
this.push(outdata)
}
}
if( -1 == endline ) {
this.linebuf.push(str.substring(remain))
}
return done();
}
return json_parser;
}
function json_stringify_stream() {
var json_stringify = new stream.Duplex({objectMode:true})
json_stringify._read = function() {}
json_stringify._write = function( data, enc, done ) {
var out = tu.stringifyJSON( seneca, 'stream', data )
if( out ) {
this.push(out+'\n')
}
done()
}
return json_stringify;
}
function hook_listen_web( args, done ) {
var seneca = this
var type = args.type
var listen_options = seneca.util.clean(_.extend({},options[type],args))
var app = connect()
app.use( timeout( listen_options.timeout ) )
// query params get injected into args
// let's you use a GET for debug
// GETs can have side-effects, this is not a web server, or a REST API
app.use( query() )
app.use( function( req, res, next ) {
var buf = []
req.setEncoding('utf8')
req.on('data', function(chunk) { buf.push(chunk) })
req.on('end', function() {
try {
var bufstr = buf.join('')
var bodydata =
0 < bufstr.length ? tu.parseJSON(seneca,'req-body',bufstr) : {}
if( util.isError(bodydata) ) {
var out = tu.prepare_response(seneca,{})
out.input = bufstr
out.error = error('invalid_json',{input:bufstr})
send_response(res,out,{})
return;
}
req.body = _.extend(
{},
bodydata,
(req.query && req.query.args$) ? jsonic(req.query.args$) : {},
req.query||{} )
next();
}
catch(err) {
err.body = err.message+': '+bufstr
err.status = 400
next(err)
}
})
})
app.use( function( req, res, next ) {
if( 0 !== req.url.indexOf(listen_options.path) ) return next();
var data
var standard = !!req.headers['seneca-id']
if( standard ) {
data = {
id: req.headers['seneca-id'],
kind: 'act',
origin: req.headers['seneca-origin'],
track: tu.parseJSON(
seneca,'track-receive',req.headers['seneca-track']) || [],
time: {
client_sent: req.headers['seneca-time-client-sent'],
},
act: req.body,
}
}
// convenience for non-seneca clients
else {
data = {
id: seneca.idgen(),
kind: 'act',
origin: req.headers['user-agent'] || 'UNKNOWN',
track: [],
time: {
client_sent: Date.now()
},
act: req.body,
}
}
tu.handle_request( seneca, data, listen_options, function(out){
send_response(res,out,data)
})
})
seneca.log.debug('listen', listen_options )
var listen = app.listen( listen_options.port, listen_options.host )
tu.close( seneca, function( done ) {
listen.close()
done()
})
function send_response(res,out,data) {
var outjson = "null"
var iserror = false
var httpcode = 200
if( null != out ) {
if( out.res ) {
outjson = tu.stringifyJSON(seneca,'listen-web',out.res)
}
else if( out.error ) {
iserror = true
outjson = tu.stringifyJSON(seneca,'listen-web',out.error)
}
}
var headers = {
'Content-Type': 'application/json',
'Cache-Control': 'private, max-age=0, no-cache, no-store',
'Content-Length': buffer.Buffer.byteLength(outjson),
}
headers['seneca-id'] = out ? out.id : seneca.id
headers['seneca-kind'] = 'res'
headers['seneca-origin'] = out ? out.origin : 'UNKNOWN'
headers['seneca-accept'] = seneca.id
headers['seneca-track'] = ''+(data.track ? data.track : [])
headers['seneca-time-client-sent'] =
out && out.item ? out.time.client_sent : '0'
headers['seneca-time-listen-recv'] =
out && out.item ? out.time.listen_recv : '0'
headers['seneca-time-listen-sent'] =
out && out.item ? out.time.listen_sent : '0'
if( iserror ) {
httpcode = 500
}
res.writeHead( httpcode, headers )
res.end( outjson )
}
done()
}
function hook_client_web( args, clientdone ) {
var seneca = this
var type = args.type
var client_options = seneca.util.clean(_.extend({},options[type],args))
tu.make_client( seneca, make_send, client_options, clientdone )
function make_send( spec, topic, send_done ) {
var fullurl =
'http://'+client_options.host+':'+
client_options.port+client_options.path
seneca.log.debug('client', 'web', 'send', spec, topic, client_options,
fullurl )
send_done( null, function( args, done ) {
var data = tu.prepare_request( this, args, done )
var headers = {
'seneca-id': data.id,
'seneca-kind': 'req',
'seneca-origin': seneca.id,
'seneca-track': tu.stringifyJSON(
seneca,'send-track',data.track||[]),
'seneca-time-client-sent': data.time.client_sent
}
needle.post(
fullurl,
data.act,
{
json: true,
headers: headers,
timeout: client_options.timeout,
},
function(err,res) {
var data = {
kind: 'res',
res: res && _.isObject(res.body) ? res.body : null,
error: err
}
if( res ) {
data.id = res.headers['seneca-id']
data.origin = res.headers['seneca-origin']
data.accept = res.headers['seneca-accept']
data.time = {
client_sent: res.headers['seneca-time-client-sent'],
listen_recv: res.headers['seneca-time-listen-recv'],
listen_sent: res.headers['seneca-time-listen-sent'],
}
if( 200 !== res.statusCode ) {
data.error = res.body
}
}
tu.handle_response( seneca, data, client_options )
}
)
})
tu.close( seneca, function( done ) {
done()
})
}
}
return {
name: plugin,
exportmap: { utils: tu },
options: options
}
}
// Error code messages.
function ERRMSGMAP() {
return {
'invalid_json':'Invalid JSON: <%=input%>.',
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc