seneca-transport
Advanced tools
Comparing version 0.7.2 to 0.8.0
/* Copyright (c) 2015 Richard Rodger, MIT License */ | ||
/* jshint node:true, asi:true, eqnull:true */ | ||
"use strict"; | ||
'use strict' | ||
// Load modules | ||
var Util = require('util') | ||
var _ = require('lodash') | ||
var Nid = require('nid') | ||
var Patrun = require('patrun') | ||
var Gex = require('gex') | ||
var Jsonic = require('jsonic') | ||
var Eraro = require('eraro') | ||
var util = require('util') | ||
// Declare internals | ||
var internals = { | ||
error: Eraro({ | ||
package: 'seneca', | ||
msgmap: { | ||
'no_data': 'The message has no data.', | ||
'invalid_kind_act': 'Inbound messages should have kind "act", kind was: <%=kind%>.', | ||
'no_message_id': 'The message has no identifier.', | ||
'invalid_origin': 'The message response is not for this instance, origin was <%=origin%>.', | ||
'unknown_message_id': 'The message has an unknown identifier', | ||
'own_message': 'Inbound message rejected as originated from this server.', | ||
'message_loop': 'Inbound message rejected as looping back to this server.', | ||
'data_error': 'Inbound message included an error description.', | ||
'invalid_json': 'Invalid JSON: <%=input%>.' | ||
}, | ||
override: true | ||
}) | ||
} | ||
module.exports = internals.Utils = function (context) { | ||
this._msgprefix = (!context.options.msgprefix ? '' : context.options.msgprefix) | ||
this._context = context | ||
} | ||
var _ = require('lodash') | ||
var nid = require('nid') | ||
var patrun = require('patrun') | ||
var gex = require('gex') | ||
var jsonic = require('jsonic') | ||
internals.Utils.prototype.handle_response = function (seneca, data, client_options) { | ||
data.time = data.time || {} | ||
data.time.client_recv = Date.now() | ||
var error = require('eraro')({ | ||
package: 'seneca', | ||
msgmap: ERRMSGMAP(), | ||
override: true | ||
}) | ||
if (data.kind !== 'res') { | ||
if (this._context.options.warn.invalid_kind) { | ||
seneca.log.warn('client', 'invalid_kind_res', client_options, data) | ||
} | ||
return false | ||
} | ||
if (data.id === null) { | ||
if (this._context.options.warn.no_message_id) { | ||
seneca.log.warn('client', 'no_message_id', client_options, data) | ||
} | ||
return false | ||
} | ||
if (seneca.id !== data.origin) { | ||
if (this._context.options.warn.invalid_origin) { | ||
seneca.log.warn('client', 'invalid_origin', client_options, data) | ||
} | ||
return false | ||
} | ||
module.exports = function( ctxt ) { | ||
var msgprefix = (null == ctxt.options.msgprefix ? '' : ctxt.options.msgprefix) | ||
var callmeta = this._context.callmap.get(data.id) | ||
var tu = { | ||
if (callmeta) { | ||
this._context.callmap.del(data.id) | ||
} | ||
else { | ||
if (this._context.options.warn.unknown_message_id) { | ||
seneca.log.warn('client', 'unknown_message_id', client_options, data) | ||
} | ||
return false | ||
} | ||
prepare_response: function prepare_response( seneca, input ) { | ||
return { | ||
id: input.id, | ||
kind: 'res', | ||
origin: input.origin, | ||
accept: seneca.id, | ||
track: input.track, | ||
time: { | ||
client_sent: (input.time && input.time.client_sent) || 0, | ||
listen_recv: Date.now() | ||
}, | ||
} | ||
}, | ||
var err = null | ||
var result = null | ||
if (data.error) { | ||
err = new Error(data.error.message) | ||
handle_response: function handle_response( seneca, data, client_options ) { | ||
data.time = data.time || {} | ||
data.time.client_recv = Date.now() | ||
_.each(data.error, function (value, key) { | ||
err[key] = value | ||
}) | ||
} | ||
else { | ||
result = this.handle_entity(data.res) | ||
} | ||
if( 'res' !== data.kind ) { | ||
if( ctxt.options.warn.invalid_kind ) { | ||
seneca.log.warn('client', 'invalid_kind_res', client_options, data) | ||
} | ||
return false | ||
} | ||
var actinfo = { | ||
id: data.id, | ||
accept: data.accept, | ||
track: data.track, | ||
time: data.time | ||
} | ||
if( null == data.id ) { | ||
if( ctxt.options.warn.no_message_id ) { | ||
seneca.log.warn('client', 'no_message_id', client_options, data); | ||
} | ||
return false; | ||
} | ||
this.callmeta({ | ||
callmeta: callmeta, | ||
err: err, | ||
result: result, | ||
actinfo: actinfo, | ||
seneca: seneca, | ||
client_options: client_options, | ||
data: data | ||
}) | ||
if( seneca.id !== data.origin ) { | ||
if( ctxt.options.warn.invalid_origin ) { | ||
seneca.log.warn('client', 'invalid_origin', client_options, data) | ||
} | ||
return false | ||
} | ||
return true | ||
} | ||
internals.Utils.prototype.callmeta = function (options) { | ||
try { | ||
options.callmeta.done(options.err, options.result, options.actinfo) | ||
} | ||
catch (e) { | ||
options.seneca.log.error('client', 'callback_error', options.client_options, options.data, e.stack || e) | ||
} | ||
} | ||
var callmeta = ctxt.callmap.get(data.id) | ||
internals.Utils.prototype.prepare_request = function (seneca, args, done) { | ||
var callmeta = { | ||
args: args, | ||
done: _.bind(done, seneca), | ||
when: Date.now() | ||
} | ||
if( callmeta ) { | ||
ctxt.callmap.del( data.id ) | ||
} | ||
else { | ||
if( ctxt.options.warn.unknown_message_id ) { | ||
seneca.log.warn('client', 'unknown_message_id', client_options, data); | ||
} | ||
return false; | ||
} | ||
this._context.callmap.set(args.meta$.id, callmeta) | ||
var err = null | ||
var result = null | ||
var track = [] | ||
if (args.transport$) { | ||
track = _.clone((args.transport$.track || [])) | ||
} | ||
track.push(seneca.id) | ||
if( data.error ) { | ||
err = new Error( data.error.message ) | ||
var output = { | ||
id: args.meta$.id, | ||
kind: 'act', | ||
origin: seneca.id, | ||
track: track, | ||
time: { client_sent: Date.now() }, | ||
act: seneca.util.clean(args) | ||
} | ||
_.each(data.error,function(v,k){ | ||
err[k] = v | ||
}) | ||
} | ||
else { | ||
result = tu.handle_entity(data.res) | ||
} | ||
var actinfo = { | ||
id: data.id, | ||
accept: data.accept, | ||
track: data.track, | ||
time: data.time | ||
} | ||
return output | ||
} | ||
try { | ||
callmeta.done( err, result, actinfo ) | ||
} | ||
catch(e) { | ||
seneca.log.error( | ||
'client', 'callback_error', client_options, data, e.stack||e) | ||
} | ||
internals.Utils.prototype.handle_request = function (seneca, data, listen_options, respond) { | ||
if (!data) { | ||
return respond({ input: data, error: internals.error('no_data') }) | ||
} | ||
return true; | ||
}, | ||
if (data.kind !== 'act') { | ||
if (this._context.options.warn.invalid_kind) { | ||
seneca.log.warn('listen', 'invalid_kind_act', listen_options, data) | ||
} | ||
return respond({ | ||
input: data, | ||
error: internals.error('invalid_kind_act', { kind: data.kind }) | ||
}) | ||
} | ||
if (data.id === null) { | ||
if (this._context.options.warn.no_message_id) { | ||
seneca.log.warn('listen', 'no_message_id', listen_options, data) | ||
} | ||
return respond({ input: data, error: internals.error('no_message_id') }) | ||
} | ||
prepare_request: function prepare_request( seneca, args, done ) { | ||
var callmeta = { | ||
args: args, | ||
done: _.bind(done,seneca), | ||
when: Date.now() | ||
if (this._context.options.check.own_message && this._context.callmap.has(data.id)) { | ||
if (this._context.options.warn.own_message) { | ||
seneca.log.warn('listen', 'own_message', listen_options, data) | ||
} | ||
return respond({ input: data, error: internals.error('own_message') }) | ||
} | ||
if (this._context.options.check.message_loop && Array.isArray(data.track)) { | ||
for (var i = 0; i < data.track.length; i++) { | ||
if (seneca.id === data.track[i]) { | ||
if (this._context.options.warn.message_loop) { | ||
seneca.log.warn('listen', 'message_loop', listen_options, data) | ||
} | ||
return respond({ input: data, error: internals.error('message_loop') }) | ||
} | ||
} | ||
} | ||
ctxt.callmap.set(args.meta$.id,callmeta) | ||
if (data.error) { | ||
seneca.log.error('listen', 'data_error', listen_options, data) | ||
return respond({ input: data, error: internals.error('data_error') }) | ||
} | ||
var track = [] | ||
if( args.transport$ ) { | ||
track = _.clone((args.transport$.track||[])) | ||
} | ||
track.push(seneca.id) | ||
var output = this.prepareResponse(seneca, data) | ||
var input = this.handle_entity(data.act) | ||
var output = { | ||
id: args.meta$.id, | ||
kind: 'act', | ||
origin: seneca.id, | ||
track: track, | ||
time: { client_sent: Date.now() }, | ||
act: seneca.util.clean(args), | ||
} | ||
input.transport$ = { | ||
track: data.track || [], | ||
origin: data.origin, | ||
time: data.time | ||
} | ||
return output; | ||
}, | ||
input.id$ = data.id | ||
this.requestAct(seneca, input, output, respond) | ||
} | ||
handle_request: function handle_request( | ||
seneca, data, listen_options, respond | ||
) { | ||
if( null == data ) return respond({input:data,error:error('no_data')}); | ||
internals.Utils.prototype.requestAct = function (seneca, input, output, respond) { | ||
var self = this | ||
if( 'act' != data.kind ) { | ||
if( ctxt.options.warn.invalid_kind ) { | ||
seneca.log.warn('listen', 'invalid_kind_act', listen_options, data) | ||
} | ||
return respond({ | ||
input:data, | ||
error:error('invalid_kind_act',{kind:data.kind}) | ||
}); | ||
} | ||
try { | ||
seneca.act(input, function (err, out) { | ||
self.update_output(input, output, err, out) | ||
respond(output) | ||
}) | ||
} | ||
catch (e) { | ||
self.catch_act_error(seneca, e, input, {}, output) | ||
respond(output) | ||
} | ||
} | ||
if( null == data.id ) { | ||
if( ctxt.options.warn.no_message_id ) { | ||
seneca.log.warn('listen', 'no_message_id', listen_options, data) | ||
} | ||
return respond({input:data,error:error('no_message_id')}); | ||
} | ||
internals.Utils.prototype.make_client = function (context_seneca, make_send, client_options, client_done) { | ||
var instance = this._context.seneca | ||
if( ctxt.options.check.own_message && ctxt.callmap.has(data.id) ) { | ||
if( ctxt.options.warn.own_message ) { | ||
seneca.log.warn('listen', 'own_message', listen_options, data) | ||
} | ||
return respond({input:data,error:error('own_message')}); | ||
} | ||
// legacy api | ||
if (!context_seneca.seneca) { | ||
client_done = client_options | ||
client_options = make_send | ||
make_send = context_seneca | ||
} | ||
else { | ||
instance = context_seneca | ||
} | ||
if( ctxt.options.check.message_loop && _.isArray(data.track) ) { | ||
for( var i = 0; i < data.track.length; i++ ) { | ||
if( seneca.id === data.track[i] ) { | ||
if( ctxt.options.warn.message_loop ) { | ||
seneca.log.warn('listen', 'message_loop', listen_options, data) | ||
} | ||
return respond({input:data,error:error('message_loop')}); | ||
} | ||
} | ||
} | ||
var pins = this.resolve_pins(client_options) | ||
instance.log.debug('client', client_options, pins || 'any') | ||
if( data.error ) { | ||
seneca.log.error('listen', 'data_error', listen_options, data ) | ||
return respond({input:data,error:error('data_error')}); | ||
} | ||
var finish = function (err, send) { | ||
if (err) { | ||
return client_done(err) | ||
} | ||
client_done(null, send) | ||
} | ||
var output = tu.prepare_response( seneca, data ) | ||
var input = tu.handle_entity( data.act ) | ||
if (pins) { | ||
var argspatrun = this.make_argspatrun(pins) | ||
var resolvesend = this.make_resolvesend(client_options, {}, make_send) | ||
input.transport$ = { | ||
track: data.track || [], | ||
origin: data.origin, | ||
time: data.time | ||
} | ||
return this.make_pinclient(resolvesend, argspatrun, finish) | ||
} | ||
input.id$ = data.id | ||
this.make_anyclient(client_options, make_send, finish) | ||
} | ||
try { | ||
seneca.act( input, function( err, out ) { | ||
tu.update_output(input,output,err,out) | ||
respond(output) | ||
}) | ||
} | ||
catch(e) { | ||
tu.catch_act_error( seneca, e, listen_options, data, output ) | ||
respond(output) | ||
} | ||
}, | ||
internals.Utils.prototype.make_anyclient = function (opts, make_send, done) { | ||
var self = this | ||
make_send({}, this._msgprefix + 'any', function (err, send) { | ||
if (err) { | ||
return done(err) | ||
} | ||
if (typeof send !== 'function') { | ||
return done(self._context.seneca.fail('null-client', { opts: opts })) | ||
} | ||
var client = { | ||
id: Nid(), | ||
toString: function () { return 'any-' + this.id }, | ||
make_client: function make_client( | ||
context_seneca, | ||
make_send, | ||
client_options, | ||
client_done | ||
) { | ||
var instance = ctxt.seneca | ||
// TODO: is this used? | ||
match: function (args) { | ||
return !this.has(args) | ||
}, | ||
// legacy api | ||
if( !context_seneca.seneca ) { | ||
client_done = client_options | ||
client_options = make_send | ||
make_send = context_seneca | ||
send: function (args, done) { | ||
send.call(this, args, done) | ||
} | ||
else { | ||
instance = context_seneca | ||
} | ||
} | ||
var pins = tu.resolve_pins( client_options ) | ||
instance.log.debug( 'client', client_options, pins||'any' ) | ||
done(null, client) | ||
}) | ||
} | ||
if( pins ) { | ||
var argspatrun = tu.make_argspatrun( pins ) | ||
var resolvesend = tu.make_resolvesend( client_options, {}, make_send ) | ||
internals.Utils.prototype.make_pinclient = function (resolvesend, argspatrun, done) { | ||
var client = { | ||
id: Nid(), | ||
toString: function () { | ||
return 'pin-' + argspatrun.mark + '-' + this.id | ||
}, | ||
tu.make_pinclient( resolvesend, argspatrun, function( err, send ) { | ||
if( err ) return client_done(err); | ||
client_done( null, send ) | ||
}) | ||
} | ||
else { | ||
tu.make_anyclient( client_options, make_send, function( err, send ) { | ||
if( err ) return client_done(err); | ||
client_done( null, send ) | ||
}) | ||
} | ||
// TODO: is this used? | ||
match: function (args) { | ||
var match = !!argspatrun.find(args) | ||
return match | ||
}, | ||
make_anyclient: function make_anyclient( opts, make_send, done ) { | ||
make_send( {}, msgprefix+'any', function( err, send ) { | ||
if( err ) return done(err); | ||
if( !_.isFunction(send) ) | ||
return done(ctxt.seneca.fail('null-client',{opts:opts})); | ||
var client = { | ||
id: nid(), | ||
toString: function(){ return 'any-'+this.id }, | ||
// TODO: is this used? | ||
match: function( args ) { | ||
return !this.has(args) | ||
}, | ||
send: function( args, done ) { | ||
send.call(this,args,done) | ||
} | ||
send: function (args, done) { | ||
var seneca = this | ||
var spec = argspatrun.find(args) | ||
resolvesend(spec, args, function (err, send) { | ||
if (err) { | ||
return done(err) | ||
} | ||
done( null, client ) | ||
send.call(seneca, args, done) | ||
}) | ||
}, | ||
} | ||
} | ||
done(null, client) | ||
} | ||
make_pinclient: function make_pinclient( resolvesend, argspatrun, done ) { | ||
var client = { | ||
id: nid(), | ||
toString: function(){ return 'pin-'+argspatrun.mark+'-'+this.id }, | ||
internals.Utils.prototype.resolve_pins = function (opts) { | ||
var pins = opts.pin || opts.pins | ||
if (pins) { | ||
pins = Array.isArray(pins) ? pins : [pins] | ||
} | ||
// TODO: is this used? | ||
match: function( args ) { | ||
var match = !!argspatrun.find(args) | ||
return match | ||
}, | ||
if (pins) { | ||
pins = _.map(pins, function (pin) { | ||
return (typeof pin === 'string') ? Jsonic(pin) : pin | ||
}) | ||
} | ||
send: function( args, done ) { | ||
var seneca = this | ||
var spec = argspatrun.find(args) | ||
resolvesend(spec,args,function(err, send){ | ||
if( err ) return done(err); | ||
send.call(seneca,args,done) | ||
}) | ||
} | ||
return pins | ||
} | ||
internals.Utils.prototype.make_argspatrun = function (pins) { | ||
var argspatrun = Patrun(function (pat, data) { | ||
var gexers = {} | ||
_.each(pat, function (val, key) { | ||
if ((typeof val === 'string') && ~val.indexOf('*')) { | ||
delete pat[key] | ||
gexers[key] = Gex(val) | ||
} | ||
}) | ||
done( null, client ) | ||
}, | ||
// handle previous patterns that match this pattern | ||
var prev = this.list(pat) | ||
var prevfind = prev[0] && prev[0].find | ||
var prevdata = prev[0] && this.findexact(prev[0].match) | ||
return function (args, data) { | ||
var out = data | ||
_.each(gexers, function (g, k) { | ||
var v = (args[k] === null) ? '' : args[k] | ||
if (g.on(v) === null) { | ||
out = null | ||
} | ||
}) | ||
resolve_pins: function resolve_pins( opts ) { | ||
var pins = opts.pin || opts.pins | ||
if( pins ) { | ||
pins = _.isArray(pins) ? pins : [pins] | ||
if (prevfind && out === null) { | ||
out = prevfind.call(this, args, prevdata) | ||
} | ||
if( pins ) { | ||
pins = _.map(pins,function(pin){ | ||
return _.isString(pin) ? jsonic(pin) : pin | ||
}) | ||
} | ||
return out | ||
} | ||
}) | ||
return pins | ||
}, | ||
_.each(pins, function (pin) { | ||
var spec = { pin: pin } | ||
argspatrun.add(pin, spec) | ||
}) | ||
argspatrun.mark = Util.inspect(pins).replace(/\s+/g, '').replace(/\n/g, '') | ||
// can handle glob expressions :) | ||
make_argspatrun: function make_argspatrun( pins ) { | ||
var argspatrun = patrun(function(pat,data) { | ||
var gexers = {} | ||
_.each(pat, function(v,k) { | ||
if( _.isString(v) && ~v.indexOf('*') ) { | ||
delete pat[k] | ||
gexers[k] = gex(v) | ||
} | ||
}) | ||
return argspatrun | ||
} | ||
// handle previous patterns that match this pattern | ||
var prev = this.list(pat) | ||
var prevfind = prev[0] && prev[0].find | ||
var prevdata = prev[0] && this.findexact(prev[0].match) | ||
internals.Utils.prototype.make_resolvesend = function (opts, sendmap, make_send) { | ||
var self = this | ||
return function (spec, args, done) { | ||
var topic = self.resolve_topic(opts, spec, args) | ||
var send = sendmap[topic] | ||
if (send) { | ||
return done(null, send) | ||
} | ||
return function(args,data) { | ||
var out = data | ||
_.each(gexers,function(g,k) { | ||
var v = null==args[k]?'':args[k] | ||
if( null == g.on( v ) ) { out = null } | ||
}) | ||
make_send(spec, topic, function (err, send) { | ||
if (err) { | ||
return done(err) | ||
} | ||
sendmap[topic] = send | ||
done(null, send) | ||
}) | ||
} | ||
} | ||
if( prevfind && null == out ) { | ||
out = prevfind.call(this,args,prevdata) | ||
} | ||
internals.Utils.prototype.resolve_topic = function (opts, spec, args) { | ||
var self = this | ||
if (!spec.pin) { | ||
return function () { | ||
return self._msgprefix + 'any' | ||
} | ||
} | ||
return out | ||
} | ||
}) | ||
var topicpin = _.clone(spec.pin) | ||
_.each( pins, function( pin ) { | ||
var spec = { pin:pin } | ||
argspatrun.add(pin,spec) | ||
}) | ||
var topicargs = {} | ||
_.each(topicpin, function (v, k) { | ||
topicargs[k] = args[k] | ||
}) | ||
argspatrun.mark = util.inspect(pins).replace(/\s+/g, '').replace(/\n/g, '') | ||
var sb = [] | ||
_.each(_.keys(topicargs).sort(), function (k) { | ||
sb.push(k) | ||
sb.push('=') | ||
sb.push(topicargs[k]) | ||
sb.push(',') | ||
}) | ||
return argspatrun | ||
}, | ||
var topic = this._msgprefix + (sb.join('')).replace(/[^\w\d]+/g, '_') | ||
return topic | ||
} | ||
internals.Utils.prototype.listen_topics = function (seneca, args, listen_options, do_topic) { | ||
var self = this | ||
var topics = [] | ||
make_resolvesend: function make_resolvesend( opts, sendmap, make_send ) { | ||
return function( spec, args, done ) { | ||
var topic = tu.resolve_topic(opts,spec,args) | ||
var send = sendmap[topic] | ||
if( send ) return done(null,send); | ||
var pins = this.resolve_pins(args) | ||
make_send(spec,topic,function(err,send){ | ||
if( err ) return done(err) | ||
sendmap[topic] = send | ||
done(null,send) | ||
}) | ||
} | ||
}, | ||
resolve_topic: function resolve_topic( opts, spec, args ) { | ||
if( !spec.pin ) return function() { return msgprefix+'any' } | ||
var topicpin = _.clone(spec.pin) | ||
var topicargs = {} | ||
_.each(topicpin, function(v,k) { topicargs[k]=args[k] }) | ||
if (pins) { | ||
_.each(this._context.seneca.findpins(pins), function (pin) { | ||
var sb = [] | ||
_.each( _.keys(topicargs).sort(), function(k){ | ||
_.each(_.keys(pin).sort(), function (k) { | ||
sb.push(k) | ||
sb.push('=') | ||
sb.push(topicargs[k]) | ||
sb.push(pin[k]) | ||
sb.push(',') | ||
}) | ||
var topic = msgprefix+(sb.join('')).replace(/[^\w\d]+/g,'_') | ||
return topic; | ||
}, | ||
var topic = self._msgprefix + (sb.join('')).replace(/[^\w\d]+/g, '_') | ||
topics.push(topic) | ||
}) | ||
listen_topics: function listen_topics( seneca, args, listen_options, do_topic ) { | ||
var topics = [] | ||
// TODO: die if no pins!!! | ||
// otherwise no listener established and seneca ends without msg | ||
} | ||
else { | ||
topics.push(this._msgprefix + 'any') | ||
} | ||
var pins = tu.resolve_pins( args ) | ||
if (typeof do_topic === 'function') { | ||
topics.forEach(function (topic) { | ||
do_topic(topic) | ||
}) | ||
} | ||
if( pins ) { | ||
_.each( ctxt.seneca.findpins( pins ), function(pin) { | ||
return topics | ||
} | ||
var sb = [] | ||
_.each( _.keys(pin).sort(), function(k){ | ||
sb.push(k) | ||
sb.push('=') | ||
sb.push(pin[k]) | ||
sb.push(',') | ||
}) | ||
internals.Utils.prototype.update_output = function (input, output, err, out) { | ||
output.res = out | ||
var topic = msgprefix+(sb.join('')).replace(/[^\w\d]+/g,'_') | ||
if (err) { | ||
var errobj = _.extend({}, err) | ||
errobj.message = err.message | ||
errobj.name = err.name || 'Error' | ||
output.error = errobj | ||
output.input = input | ||
} | ||
topics.push(topic) | ||
}) | ||
output.time.listen_sent = Date.now() | ||
} | ||
// TODO: die if no pins!!! | ||
// otherwise no listener established and seneca ends without msg | ||
} | ||
else { | ||
topics.push( msgprefix+'any' ) | ||
} | ||
internals.Utils.prototype.catch_act_error = function (seneca, e, listen_options, input, output) { | ||
seneca.log.error('listen', 'act-error', listen_options, e.stack || e) | ||
output.error = e | ||
output.input = input | ||
} | ||
if( _.isFunction( do_topic ) ) { | ||
topics.forEach( function(topic){ | ||
do_topic( topic ) | ||
}) | ||
} | ||
return topics; | ||
}, | ||
update_output: function update_output( input, output, err, out ) { | ||
output.res = out | ||
if( err ) { | ||
var errobj = _.extend({},err) | ||
errobj.message = err.message | ||
errobj.name = err.name || 'Error' | ||
output.error = errobj | ||
output.input = input | ||
} | ||
output.time.listen_sent = Date.now() | ||
}, | ||
catch_act_error: function catch_act_error( | ||
seneca, e, listen_options, input, output | ||
) { | ||
seneca.log.error('listen', 'act-error', listen_options, e.stack || e ) | ||
output.error = e | ||
output.input = input | ||
}, | ||
// only support first level | ||
// interim measure - deal with this in core seneca act api | ||
// allow user to specify operations on result | ||
handle_entity: function handle_entity( raw ) { | ||
if( null == raw ) return raw; | ||
internals.Utils.prototype.handle_entity = function (raw) { | ||
var self = this | ||
if (!raw) { | ||
return raw | ||
} | ||
raw = _.isObject( raw ) ? raw : {} | ||
if( raw.entity$ ) { | ||
return ctxt.seneca.make$( raw ) | ||
} | ||
else { | ||
_.each( raw, function(v,k) { | ||
if( _.isObject(v) && v.entity$ ) { | ||
raw[k] = ctxt.seneca.make$( v ) | ||
} | ||
}) | ||
return raw | ||
} | ||
}, | ||
raw = _.isObject(raw) ? raw : {} | ||
if (raw.entity$) { | ||
return this._context.seneca.make$(raw) | ||
} | ||
close: function close( seneca, closer ) { | ||
seneca.add('role:seneca,cmd:close',function( close_args, done ) { | ||
var seneca = this | ||
_.each(raw, function (value, key) { | ||
if (_.isObject(value) && value.entity$) { | ||
raw[key] = self._context.seneca.make$(value) | ||
} | ||
}) | ||
return raw | ||
} | ||
closer.call( seneca, function( err ) { | ||
if( err ) seneca.log.error(err); | ||
// legacy names | ||
internals.Utils.prototype.resolvetopic = internals.Utils.prototype.resolve_topic | ||
seneca.prior(close_args,done) | ||
}) | ||
}) | ||
}, | ||
internals.Utils.prototype.close = function (seneca, closer) { | ||
seneca.add('role:seneca,cmd:close', function (close_args, done) { | ||
var seneca = this | ||
parseJSON: function parseJSON( seneca, note, str ) { | ||
if( str ) { | ||
try { | ||
return JSON.parse( str ) | ||
} | ||
catch( e ) { | ||
seneca.log.warn( 'json-parse', note, str.replace(/[\r\n\t]+/g,''), e.message ) | ||
e.input = str | ||
return e; | ||
} | ||
closer.call(seneca, function (err) { | ||
if (err) { | ||
seneca.log.error(err) | ||
} | ||
}, | ||
seneca.prior(close_args, done) | ||
}) | ||
}) | ||
} | ||
stringifyJSON: function stringifyJSON( seneca, note, obj ) { | ||
if( obj ) { | ||
try { | ||
return JSON.stringify( obj ) | ||
} | ||
catch( e ) { | ||
seneca.log.warn( 'json-stringify', note, obj, e.message ) | ||
} | ||
} | ||
}, | ||
internals.Utils.prototype.stringifyJSON = function (seneca, note, obj) { | ||
if (!obj) { | ||
return | ||
} | ||
try { | ||
return JSON.stringify(obj) | ||
} | ||
catch (e) { | ||
seneca.log.warn('json-stringify', note, obj, e.message) | ||
} | ||
} | ||
internals.Utils.prototype.parseJSON = function (seneca, note, str) { | ||
if (!str) { | ||
return | ||
} | ||
// legacy names | ||
tu.resolvetopic = tu.resolve_topic | ||
return tu; | ||
try { | ||
return JSON.parse(str) | ||
} | ||
catch (e) { | ||
seneca.log.warn('json-parse', note, str.replace(/[\r\n\t]+/g, ''), e.message) | ||
e.input = str | ||
return e | ||
} | ||
} | ||
// Error code messages. | ||
function ERRMSGMAP() { | ||
internals.Utils.prototype.prepareResponse = function (seneca, input) { | ||
return { | ||
'no_data':'The message has no data.', | ||
'invalid_kind_act':'Inbound messages should have kind "act", kind was: <%=kind%>.', | ||
'no_message_id':'The message has no identifier.', | ||
'invalid_origin':'The message response is not for this instance, origin was <%=origin%>.', | ||
'unknown_message_id':'The message has an unknown identifier', | ||
'own_message':'Inbound message rejected as originated from this server.', | ||
'message_loop':'Inbound message rejected as looping back to this server.', | ||
'data_error':'Inbound message included an error description.', | ||
id: input.id, | ||
kind: 'res', | ||
origin: input.origin, | ||
accept: seneca.id, | ||
track: input.track, | ||
time: { | ||
client_sent: (input.time && input.time.client_sent) || 0, | ||
listen_recv: Date.now() | ||
} | ||
} | ||
} | ||
internals.Utils.prototype.error = internals.error |
{ | ||
"name": "seneca-transport", | ||
"version": "0.7.2", | ||
"version": "0.8.0", | ||
"description": "Seneca transport", | ||
"main": "transport.js", | ||
"scripts": { | ||
"test": "./test.sh", | ||
"build": "./build.sh" | ||
"test": "./node_modules/.bin/lab test/*.test.js -v -m 3000 -t 80 -L", | ||
"build": "./build.sh", | ||
"bench": "node bench", | ||
"annotate": "docco transport.js -o doc" | ||
}, | ||
@@ -22,15 +24,13 @@ "repository": { | ||
"dependencies": { | ||
"async": "0.9.2", | ||
"connect": "3.0.2", | ||
"connect-query": "0.2.0", | ||
"connect-timeout": "1.5.0", | ||
"eraro": "0.4.1", | ||
"gex": "0.2.0", | ||
"jsonic": "0.2.0", | ||
"lodash": "2.4.2", | ||
"lru-cache": "2.6.4", | ||
"needle": "0.9.2", | ||
"eraro": "0.4.x", | ||
"gex": "0.2.x", | ||
"jsonic": "0.2.x", | ||
"lodash": "3.10.x", | ||
"lru-cache": "2.7.x", | ||
"ndjson": "1.4.x", | ||
"nid": "0.3.2", | ||
"patrun": "0.4.3", | ||
"reconnect-net": "0.0.0" | ||
"patrun": "0.5.x", | ||
"qs": "5.2.0", | ||
"reconnect-core": "1.1.x", | ||
"wreck": "6.3.x" | ||
}, | ||
@@ -44,8 +44,13 @@ "files": [ | ||
"devDependencies": { | ||
"async": "1.5.x", | ||
"bench": "0.3.x", | ||
"code": "1.x.x", | ||
"docco": "0.7.x", | ||
"eslint-config-seneca": "1.x.x", | ||
"eslint-plugin-hapi": "2.x.x", | ||
"eslint-plugin-standard": "1.x.x", | ||
"lab": "6.x.x", | ||
"seneca": "plugin", | ||
"docco": "0.7.0", | ||
"jshint": "2.8.0", | ||
"mocha": "2.1.0", | ||
"seneca-transport-test": "0.1.3" | ||
"seneca-transport-test": "0.2.x" | ||
} | ||
} |
110
README.md
@@ -1,5 +0,7 @@ | ||
seneca-transport - a [Seneca](http://senecajs.org) plugin | ||
========================================================= | ||
 | ||
> A [Seneca.js][] transport plugin | ||
## Seneca Transport Plugin | ||
# seneca-transport | ||
[![Build Status][travis-badge]][travis-url] | ||
[![Gitter][gitter-badge]][gitter-url] | ||
@@ -11,28 +13,22 @@ This plugin provides the HTTP and TCP transport channels for | ||
For a gentle introduction to Seneca itself, see the | ||
[senecajs.org](http://senecajs.org) site. | ||
- __Version:__ 0.2.3 | ||
- __Tested on:__ Seneca 0.7 | ||
- __Node:__ 0.10, 0.12, 4 | ||
seneca-transport's source can be read in an annotated fashion by, | ||
- running `npm run annotate` | ||
- viewing [./doc/transport.html]() locally | ||
## Support | ||
Current Version: 0.7.2 | ||
Tested on: [Seneca](//github.com/rjrodger/seneca) 0.6.3 | ||
[](https://travis-ci.org/rjrodger/seneca-transport) | ||
Built and tested against versions: `0.10, 0.11, 0.12, iojs` | ||
[Annotated Source](http://rjrodger.github.io/seneca-transport/doc/transport.html) | ||
If you're using this module, and need help, you can: | ||
* Post a [github issue](//github.com/rjrodger/seneca-transport/issues), | ||
* Tweet to [@senecajs](http://twitter.com/senecajs), | ||
* Ask on the [](https://gitter.im/rjrodger/seneca-transport). | ||
- Post a [github issue][], | ||
- Tweet to [@senecajs][], | ||
- Ask on the [Gitter][gitter-url]. | ||
If you are new to Seneca in general, please take a look at [senecajs.org][]. We have everything from | ||
tutorials to sample apps to help get you up and running quickly. | ||
### Install | ||
This plugin module is included in the main Seneca module: | ||
This plugin module is included in the main Seneca module, | ||
@@ -77,3 +73,3 @@ ```sh | ||
var seneca = require('seneca') | ||
seneca() | ||
@@ -121,3 +117,3 @@ .use(color) | ||
```sh | ||
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT | ||
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT | ||
[TIME] ly../..80/- DEBUG act color - IN 485n.. color:red {color=red} f2rv.. | ||
@@ -170,3 +166,3 @@ [TIME] ly../..80/- DEBUG act color - OUT 485n.. color:red {hex=#FF0000} f2rv.. | ||
[TIME] ly../..80/- DEBUG plugin color - ADD f2rv.. color:red | ||
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT | ||
[TIME] vy../..15/- DEBUG act - - IN 485n.. color:red {color=red} CLIENT | ||
[TIME] ly../..80/- DEBUG act color - IN 485n.. color:red {color=red} f2rv.. | ||
@@ -194,3 +190,3 @@ [TIME] ly../..80/- DEBUG act color - OUT 485n.. color:red {hex=#FF0000} f2rv.. | ||
var seneca = require('seneca') | ||
seneca() | ||
@@ -211,3 +207,3 @@ .use(color) | ||
var seneca = require('seneca') | ||
seneca() | ||
@@ -248,3 +244,3 @@ .client() | ||
> Content-Type: application/x-www-form-urlencoded | ||
> | ||
> | ||
* upload completely sent off: 15 out of 15 bytes | ||
@@ -264,3 +260,3 @@ < HTTP/1.1 200 OK | ||
< Connection: keep-alive | ||
< | ||
< | ||
* Connection #0 to host localhost left intact | ||
@@ -376,3 +372,3 @@ {"hex":"#FF0000"} | ||
[TIME] f1../..79/- INFO hello Seneca/0.5.20/f1../..79/- | ||
[TIME] f1../..79/- DEBUG act - - IN wdfw.. color:red {color=red} CLIENT | ||
[TIME] f1../..79/- DEBUG act - - IN wdfw.. color:red {color=red} CLIENT | ||
[TIME] 6g../..49/- INFO plugin transport - ACT b01d.. listen open {type=tcp,host=0.0.0.0,port=10201,...} | ||
@@ -647,6 +643,6 @@ [TIME] f1../..79/- INFO plugin transport - ACT nid1.. client {type=tcp,host=0.0.0.0,port=10201,...} any | ||
* _time_: | ||
* _client_sent_: client timestamp when message sent | ||
* _listen_recv_: server timestamp when message received | ||
* _listen_sent_: server timestamp when response sent | ||
* _client_recv_: client timestamp when response received | ||
* _client_sent_: client timestamp when message sent | ||
* _listen_recv_: server timestamp when message received | ||
* _listen_sent_: server timestamp when response sent | ||
* _client_recv_: client timestamp when response received | ||
* _act_: action message data, as submitted to Seneca | ||
@@ -702,5 +698,5 @@ * _res_: response message data, as provided by Seneca | ||
function make_send( spec, topic, send_done ) { | ||
// setup topic in transport mechanism | ||
// send the args over the transport | ||
@@ -745,3 +741,3 @@ send_done( null, function( args, done ) { | ||
if( null == out ) return ...; | ||
// otherwise, send the result back | ||
@@ -784,3 +780,3 @@ // don't forget to stringifyJSON(out) if necessary | ||
The primary options are: | ||
* _msgprefix_: a string to prefix to topic names so that they are namespaced | ||
@@ -878,14 +874,2 @@ * _callmax_: the maximum number of in-flight request/response messages to cache | ||
## Testing | ||
This module itself does not contain any direct reference to Seneca, as | ||
it is a Seneca dependency. However, Seneca is needed to test it, so | ||
the test script will perform an _npm install seneca_ (if needed). This is not | ||
saved to _package.json_. | ||
```sh | ||
npm test | ||
``` | ||
## Releases | ||
@@ -897,4 +881,32 @@ | ||
## Testing with Docker Compose | ||
With docker-machine and docker-compose installed run the following commands: | ||
``` | ||
docker-compose build | ||
docker-compose up | ||
``` | ||
The output will be the stdout from the server and client logs. You should also | ||
see the client instance outputting the result from the server: `{ hex: '#FF0000' }` | ||
## Contributing | ||
The [Senecajs org][] encourage open participation. If you feel you can help in any way, be it with | ||
documentation, examples, extra testing, or new features please get in touch. | ||
## License | ||
Copyright Richard Rodger and other contributors 2015, Licensed under [MIT][]. | ||
[travis-badge]: https://travis-ci.org/senecajs/seneca-transport.svg | ||
[travis-url]: https://travis-ci.org/senecajs/seneca-transport | ||
[gitter-badge]: https://badges.gitter.im/Join%20Chat.svg | ||
[gitter-url]: https://gitter.im/senecajs/seneca | ||
[MIT]: ./LICENSE | ||
[Senecajs org]: https://github.com/senecajs/ | ||
[Seneca.js]: https://www.npmjs.com/package/seneca | ||
[senecajs.org]: http://senecajs.org/ | ||
[leveldb]: http://leveldb.org/ | ||
[github issue]: https://github.com/senecajs/seneca-transport/issues | ||
[@senecajs]: http://twitter.com/senecajs |
640
transport.js
/* Copyright (c) 2013-2015 Richard Rodger, MIT License */ | ||
/* jshint node:true, asi:true, eqnull:true */ | ||
"use strict"; | ||
'use strict' | ||
// Load modules | ||
var _ = require('lodash') | ||
var LruCache = require('lru-cache') | ||
var Tcp = require('./lib/tcp') | ||
var TransportUtil = require('./lib/transport-utils.js') | ||
var Http = require('./lib/http') | ||
var buffer = require('buffer') | ||
var util = require('util') | ||
var net = require('net') | ||
var stream = require('stream') | ||
var _ = require('lodash') | ||
var connect = require('connect') | ||
var needle = require('needle') | ||
var lrucache = require('lru-cache') | ||
var reconnect = require('reconnect-net') | ||
var timeout = require('connect-timeout') | ||
var query = require('connect-query') | ||
var jsonic = require('jsonic') | ||
var error = require('eraro')({ | ||
package: 'seneca', | ||
msgmap: ERRMSGMAP(), | ||
override: true | ||
}) | ||
var make_tu = require('./lib/transport-utils.js') | ||
module.exports = function transport( options ) { | ||
/* jshint validthis:true */ | ||
var seneca = this | ||
var plugin = 'transport' | ||
var so = seneca.options() | ||
options = seneca.util.deepextend({ | ||
// Declare internals | ||
var internals = { | ||
defaults: { | ||
msgprefix: 'seneca_', | ||
callmax: 111111, | ||
msgidlen: 12, | ||
callmax: 111111, | ||
msgidlen: 12, | ||
warn: { | ||
unknown_message_id: true, | ||
invalid_kind: true, | ||
invalid_origin: true, | ||
no_message_id: true, | ||
message_loop: true, | ||
own_message: true, | ||
invalid_kind: true, | ||
invalid_origin: true, | ||
no_message_id: true, | ||
message_loop: true, | ||
own_message: true | ||
}, | ||
check: { | ||
@@ -57,539 +30,96 @@ message_loop: true, | ||
}, | ||
web: { | ||
type: 'web', | ||
port: 10101, | ||
host: '0.0.0.0', | ||
path: '/act', | ||
type: 'web', | ||
port: 10101, | ||
host: '0.0.0.0', | ||
path: '/act', | ||
protocol: 'http', | ||
timeout: Math.max( so.timeout ? so.timeout-555 : 5555, 555 ) | ||
timeout: 5555 | ||
}, | ||
tcp: { | ||
type: 'tcp', | ||
host: '0.0.0.0', | ||
port: 10201, | ||
timeout: Math.max( so.timeout ? so.timeout-555 : 5555, 555 ) | ||
}, | ||
type: 'tcp', | ||
host: '0.0.0.0', | ||
port: 10201, | ||
timeout: 5555 | ||
} | ||
}, | ||
plugin: 'transport' | ||
} | ||
},options) | ||
module.exports = function (options) { | ||
var seneca = this | ||
var settings = seneca.util.deepextend(internals.defaults, options) | ||
var callmap = LruCache(settings.callmax) | ||
var transportUtil = new TransportUtil({ | ||
callmap: callmap, | ||
seneca: seneca, | ||
options: settings | ||
}) | ||
seneca.add({ role: internals.plugin, cmd: 'inflight' }, internals.inflight(callmap)) | ||
seneca.add({ role: internals.plugin, cmd: 'listen' }, internals.listen) | ||
seneca.add({ role: internals.plugin, cmd: 'client' }, internals.client) | ||
// Pending callbacks for all transports. | ||
var callmap = lrucache( options.callmax ) | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'tcp' }, Tcp.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'tcp' }, Tcp.client(settings, transportUtil)) | ||
// Utility functions, bound to this transport context | ||
var tu = make_tu( { callmap:callmap, seneca:seneca, options:options } ) | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'web' }, Http.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'web' }, Http.client(settings, transportUtil)) | ||
seneca.add({role:plugin,cmd:'inflight'}, cmd_inflight) | ||
seneca.add({role:plugin,cmd:'listen'}, cmd_listen) | ||
seneca.add({role:plugin,cmd:'client'}, cmd_client) | ||
seneca.add({role:plugin,hook:'listen',type:'tcp'}, hook_listen_tcp) | ||
seneca.add({role:plugin,hook:'client',type:'tcp'}, hook_client_tcp) | ||
seneca.add({role:plugin,hook:'listen',type:'web'}, hook_listen_web) | ||
seneca.add({role:plugin,hook:'client',type:'web'}, hook_client_web) | ||
// Aliases. | ||
seneca.add({role:plugin,hook:'listen',type:'http'}, hook_listen_web) | ||
seneca.add({role:plugin,hook:'client',type:'http'}, hook_client_web) | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'http' }, Http.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'http' }, Http.client(settings, transportUtil)) | ||
// Legacy API. | ||
seneca.add({role:plugin,hook:'listen',type:'direct'}, hook_listen_web) | ||
seneca.add({role:plugin,hook:'client',type:'direct'}, hook_client_web) | ||
seneca.add({ role: internals.plugin, hook: 'listen', type: 'direct' }, Http.listen(settings, transportUtil)) | ||
seneca.add({ role: internals.plugin, hook: 'client', type: 'direct' }, Http.client(settings, transportUtil)) | ||
return { | ||
name: internals.plugin, | ||
exportmap: { utils: transportUtil }, | ||
options: settings | ||
} | ||
} | ||
function cmd_inflight( args, done ) { | ||
internals.inflight = function (callmap) { | ||
return function (args, callback) { | ||
var inflight = {} | ||
callmap.forEach( function(v,k) { | ||
inflight[k] = v | ||
callmap.forEach(function (val, key) { | ||
inflight[key] = val | ||
}) | ||
done( null, inflight ) | ||
callback(null, inflight) | ||
} | ||
} | ||
internals.listen = function (args, callback) { | ||
var seneca = this | ||
function cmd_listen( args, done ) { | ||
var seneca = this | ||
var listen_config = args.config | ||
var listen_args = | ||
seneca.util.clean( | ||
_.omit( | ||
_.extend({},listen_config,{role:plugin,hook:'listen'}),'cmd')) | ||
if( handle_legacy_types(listen_args.type,done) ) { | ||
seneca.act( listen_args, done ) | ||
} | ||
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'listen' }) | ||
var listen_args = seneca.util.clean(_.omit(config, 'cmd')) | ||
var legacyError = internals.legacyError(seneca, listen_args.type) | ||
if (legacyError) { | ||
return callback(legacyError) | ||
} | ||
seneca.act(listen_args, callback) | ||
} | ||
internals.client = function (args, callback) { | ||
var seneca = this | ||
function cmd_client( args, done ) { | ||
var seneca = this | ||
var client_config = args.config | ||
var client_args = | ||
seneca.util.clean( | ||
_.omit( | ||
_.extend({},client_config,{role:plugin,hook:'client'}),'cmd')) | ||
if( handle_legacy_types(client_args.type,done) ) { | ||
seneca.act( client_args, done ) | ||
} | ||
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'client' }) | ||
var client_args = seneca.util.clean(_.omit(config, 'cmd')) | ||
var legacyError = internals.legacyError(seneca, client_args.type) | ||
if (legacyError) { | ||
return callback(legacyError) | ||
} | ||
seneca.act(client_args, callback) | ||
} | ||
function handle_legacy_types(type,done) { | ||
var ok = false | ||
if( 'pubsub' == type ) { | ||
done(seneca.fail('plugin-needed',{name:'seneca-redis-transport'})) | ||
} | ||
else if( 'queue' == type ) { | ||
done(seneca.fail('plugin-needed',{name:'seneca-beanstalkd-transport'})) | ||
} | ||
else ok = true; | ||
return ok; | ||
internals.legacyError = function (seneca, type) { | ||
if (type === 'pubsub') { | ||
return seneca.fail('plugin-needed', { name: 'seneca-redis-transport' }) | ||
} | ||
function hook_listen_tcp( args, done ) { | ||
var seneca = this | ||
var type = args.type | ||
var listen_options = seneca.util.clean(_.extend({},options[type],args)) | ||
function make_msger() { | ||
var msger = new stream.Duplex({objectMode:true}) | ||
msger._read = function() {} | ||
msger._write = function( data, enc , done ) { | ||
var stream_instance = this | ||
if( util.isError(data) ) { | ||
var out = tu.prepare_response(seneca,{}) | ||
out.input = data.input | ||
out.error = error('invalid_json',{input:data.input}) | ||
stream_instance.push(out) | ||
return done() | ||
} | ||
tu.handle_request( seneca, data, listen_options, function(out) { | ||
if( null == out ) return done(); | ||
stream_instance.push(out) | ||
return done(); | ||
}) | ||
} | ||
return msger | ||
} | ||
var connections = [] | ||
var listen = net.createServer(function(connection) { | ||
seneca.log.debug('listen', 'connection', listen_options, | ||
'remote', connection.remoteAddress, connection.remotePort) | ||
connection | ||
.pipe(json_parser_stream()) | ||
.pipe(make_msger()) | ||
.pipe(json_stringify_stream()) | ||
.pipe(connection) | ||
connection.on('error',function(err){ | ||
seneca.log.error('listen', 'pipe-error', listen_options, err.stack||err) | ||
}) | ||
connections.push(connection) | ||
}) | ||
listen.on('listening', function() { | ||
seneca.log.debug('listen', 'open', | ||
listen_options) | ||
done() | ||
}) | ||
listen.on('error', function(err) { | ||
seneca.log.error('listen', 'net-error', listen_options, err.stack||err) | ||
}) | ||
listen.on('close', function() { | ||
seneca.log.debug('listen', 'close', listen_options) | ||
}) | ||
listen.listen( listen_options.port, listen_options.host ) | ||
tu.close( seneca, function(done){ | ||
listen.close() | ||
connections.forEach(function(con){ | ||
try { con.destroy() } catch(e) { seneca.log.error(e) } | ||
}) | ||
done() | ||
}) | ||
if (type === 'queue') { | ||
return seneca.fail('plugin-needed', { name: 'seneca-beanstalkd-transport' }) | ||
} | ||
function hook_client_tcp( args, clientdone ) { | ||
var seneca = this | ||
var type = args.type | ||
var client_options = seneca.util.clean(_.extend({},options[type],args)) | ||
tu.make_client( seneca, make_send, client_options, clientdone ) | ||
function make_send( spec, topic, send_done ) { | ||
seneca.log.debug('client', type, 'send-init', | ||
spec, topic, client_options) | ||
function make_msger() { | ||
var msger = new stream.Duplex({objectMode:true}) | ||
msger._read = function() {} | ||
msger._write = function( data, enc, done ) { | ||
tu.handle_response( seneca, data, client_options ) | ||
return done(); | ||
} | ||
return msger; | ||
} | ||
var msger = make_msger() | ||
var connections = [] | ||
var clientconnect = reconnect( function(client) { | ||
connections.push(client) | ||
client | ||
.pipe( json_parser_stream() ) | ||
.pipe( msger ) | ||
.pipe( json_stringify_stream() ) | ||
.pipe( client ) | ||
}).on('connect', function() { | ||
seneca.log.debug('client', type, 'connect', | ||
spec, topic, client_options) | ||
}).on('reconnect', function() { | ||
seneca.log.debug('client', type, 'reconnect', | ||
spec, topic, client_options) | ||
}).on('disconnect', function(err) { | ||
seneca.log.debug('client', type, 'disconnect', | ||
spec, topic, client_options, | ||
(err&&err.stack)||err) | ||
}).connect({ | ||
port: client_options.port, | ||
host: client_options.host | ||
}) | ||
send_done( null, function( args, done ) { | ||
var outmsg = tu.prepare_request( this, args, done ) | ||
msger.push( outmsg ) | ||
}) | ||
tu.close( seneca, function( done ) { | ||
clientconnect.disconnect() | ||
connections.forEach(function(con){ | ||
try { con.destroy() } catch(e) { seneca.log.error(e) } | ||
}) | ||
done() | ||
}) | ||
} | ||
} | ||
function json_parser_stream() { | ||
var json_parser = new stream.Duplex({objectMode:true}) | ||
json_parser.linebuf = [] | ||
json_parser._read = function() {} | ||
json_parser._write = function(data,enc,done) { | ||
var str = ''+data | ||
var endline = -1 | ||
var remain = 0 | ||
while( -1 != (endline = str.indexOf('\n',remain)) ) { | ||
this.linebuf.push( str.substring(remain,endline) ) | ||
var jsonstr = this.linebuf.join('') | ||
this.linebuf.length = 0 | ||
remain = endline+1 | ||
if( '' === jsonstr ) { | ||
return done(); | ||
} | ||
var outdata = tu.parseJSON( seneca, 'stream', jsonstr ) | ||
if( outdata ) { | ||
this.push(outdata) | ||
} | ||
} | ||
if( -1 == endline ) { | ||
this.linebuf.push(str.substring(remain)) | ||
} | ||
return done(); | ||
} | ||
return json_parser; | ||
} | ||
function json_stringify_stream() { | ||
var json_stringify = new stream.Duplex({objectMode:true}) | ||
json_stringify._read = function() {} | ||
json_stringify._write = function( data, enc, done ) { | ||
var out = tu.stringifyJSON( seneca, 'stream', data ) | ||
if( out ) { | ||
this.push(out+'\n') | ||
} | ||
done() | ||
} | ||
return json_stringify; | ||
} | ||
function hook_listen_web( args, done ) { | ||
var seneca = this | ||
var type = args.type | ||
var listen_options = seneca.util.clean(_.extend({},options[type],args)) | ||
var app = connect() | ||
app.use( timeout( listen_options.timeout ) ) | ||
// query params get injected into args | ||
// let's you use a GET for debug | ||
// GETs can have side-effects, this is not a web server, or a REST API | ||
app.use( query() ) | ||
app.use( function( req, res, next ) { | ||
var buf = [] | ||
req.setEncoding('utf8') | ||
req.on('data', function(chunk) { buf.push(chunk) }) | ||
req.on('end', function() { | ||
try { | ||
var bufstr = buf.join('') | ||
var bodydata = | ||
0 < bufstr.length ? tu.parseJSON(seneca,'req-body',bufstr) : {} | ||
if( util.isError(bodydata) ) { | ||
var out = tu.prepare_response(seneca,{}) | ||
out.input = bufstr | ||
out.error = error('invalid_json',{input:bufstr}) | ||
send_response(res,out,{}) | ||
return; | ||
} | ||
req.body = _.extend( | ||
{}, | ||
bodydata, | ||
(req.query && req.query.args$) ? jsonic(req.query.args$) : {}, | ||
req.query||{} ) | ||
next(); | ||
} | ||
catch(err) { | ||
err.body = err.message+': '+bufstr | ||
err.status = 400 | ||
next(err) | ||
} | ||
}) | ||
}) | ||
app.use( function( req, res, next ) { | ||
if( 0 !== req.url.indexOf(listen_options.path) ) return next(); | ||
var data | ||
var standard = !!req.headers['seneca-id'] | ||
if( standard ) { | ||
data = { | ||
id: req.headers['seneca-id'], | ||
kind: 'act', | ||
origin: req.headers['seneca-origin'], | ||
track: tu.parseJSON( | ||
seneca,'track-receive',req.headers['seneca-track']) || [], | ||
time: { | ||
client_sent: req.headers['seneca-time-client-sent'], | ||
}, | ||
act: req.body, | ||
} | ||
} | ||
// convenience for non-seneca clients | ||
else { | ||
data = { | ||
id: seneca.idgen(), | ||
kind: 'act', | ||
origin: req.headers['user-agent'] || 'UNKNOWN', | ||
track: [], | ||
time: { | ||
client_sent: Date.now() | ||
}, | ||
act: req.body, | ||
} | ||
} | ||
tu.handle_request( seneca, data, listen_options, function(out){ | ||
send_response(res,out,data) | ||
}) | ||
}) | ||
seneca.log.debug('listen', listen_options ) | ||
var listen = app.listen( listen_options.port, listen_options.host ) | ||
tu.close( seneca, function( done ) { | ||
listen.close() | ||
done() | ||
}) | ||
function send_response(res,out,data) { | ||
var outjson = "null" | ||
var iserror = false | ||
var httpcode = 200 | ||
if( null != out ) { | ||
if( out.res ) { | ||
outjson = tu.stringifyJSON(seneca,'listen-web',out.res) | ||
} | ||
else if( out.error ) { | ||
iserror = true | ||
outjson = tu.stringifyJSON(seneca,'listen-web',out.error) | ||
} | ||
} | ||
var headers = { | ||
'Content-Type': 'application/json', | ||
'Cache-Control': 'private, max-age=0, no-cache, no-store', | ||
'Content-Length': buffer.Buffer.byteLength(outjson), | ||
} | ||
headers['seneca-id'] = out ? out.id : seneca.id | ||
headers['seneca-kind'] = 'res' | ||
headers['seneca-origin'] = out ? out.origin : 'UNKNOWN' | ||
headers['seneca-accept'] = seneca.id | ||
headers['seneca-track'] = ''+(data.track ? data.track : []) | ||
headers['seneca-time-client-sent'] = | ||
out && out.item ? out.time.client_sent : '0' | ||
headers['seneca-time-listen-recv'] = | ||
out && out.item ? out.time.listen_recv : '0' | ||
headers['seneca-time-listen-sent'] = | ||
out && out.item ? out.time.listen_sent : '0' | ||
if( iserror ) { | ||
httpcode = 500 | ||
} | ||
res.writeHead( httpcode, headers ) | ||
res.end( outjson ) | ||
} | ||
done() | ||
} | ||
function hook_client_web( args, clientdone ) { | ||
var seneca = this | ||
var type = args.type | ||
var client_options = seneca.util.clean(_.extend({},options[type],args)) | ||
tu.make_client( seneca, make_send, client_options, clientdone ) | ||
function make_send( spec, topic, send_done ) { | ||
var fullurl = | ||
'http://'+client_options.host+':'+ | ||
client_options.port+client_options.path | ||
seneca.log.debug('client', 'web', 'send', spec, topic, client_options, | ||
fullurl ) | ||
send_done( null, function( args, done ) { | ||
var data = tu.prepare_request( this, args, done ) | ||
var headers = { | ||
'seneca-id': data.id, | ||
'seneca-kind': 'req', | ||
'seneca-origin': seneca.id, | ||
'seneca-track': tu.stringifyJSON( | ||
seneca,'send-track',data.track||[]), | ||
'seneca-time-client-sent': data.time.client_sent | ||
} | ||
needle.post( | ||
fullurl, | ||
data.act, | ||
{ | ||
json: true, | ||
headers: headers, | ||
timeout: client_options.timeout, | ||
}, | ||
function(err,res) { | ||
var data = { | ||
kind: 'res', | ||
res: res && _.isObject(res.body) ? res.body : null, | ||
error: err | ||
} | ||
if( res ) { | ||
data.id = res.headers['seneca-id'] | ||
data.origin = res.headers['seneca-origin'] | ||
data.accept = res.headers['seneca-accept'] | ||
data.time = { | ||
client_sent: res.headers['seneca-time-client-sent'], | ||
listen_recv: res.headers['seneca-time-listen-recv'], | ||
listen_sent: res.headers['seneca-time-listen-sent'], | ||
} | ||
if( 200 !== res.statusCode ) { | ||
data.error = res.body | ||
} | ||
} | ||
tu.handle_response( seneca, data, client_options ) | ||
} | ||
) | ||
}) | ||
tu.close( seneca, function( done ) { | ||
done() | ||
}) | ||
} | ||
} | ||
return { | ||
name: plugin, | ||
exportmap: { utils: tu }, | ||
options: options | ||
} | ||
} | ||
// Error code messages. | ||
function ERRMSGMAP() { | ||
return { | ||
'invalid_json':'Invalid JSON: <%=input%>.', | ||
} | ||
} |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
11
5
898
50458
10
566
2
+ Addedndjson@1.4.x
+ Addedqs@5.2.0
+ Addedreconnect-core@1.1.x
+ Addedwreck@6.3.x
+ Addedboom@2.10.1(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addedgex@0.2.2(transitive)
+ Addedhoek@2.16.3(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedjsonic@0.2.2(transitive)
+ Addedlodash@3.10.03.10.14.15.0(transitive)
+ Addedlru-cache@2.7.3(transitive)
+ Addedminimist@1.2.8(transitive)
+ Addedndjson@1.4.4(transitive)
+ Addedpatrun@0.5.1(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedqs@5.2.0(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedreconnect-core@1.1.0(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedsplit2@2.2.0(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedthrough2@2.0.5(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addedwreck@6.3.0(transitive)
+ Addedxtend@4.0.2(transitive)
- Removedasync@0.9.2
- Removedconnect@3.0.2
- Removedconnect-query@0.2.0
- Removedconnect-timeout@1.5.0
- Removedneedle@0.9.2
- Removedreconnect-net@0.0.0
- Removedasync@0.9.2(transitive)
- Removedconnect@3.0.2(transitive)
- Removedconnect-query@0.2.0(transitive)
- Removedconnect-timeout@1.5.0(transitive)
- Removeddebug@1.0.21.0.32.1.32.6.9(transitive)
- Removedescape-html@1.0.1(transitive)
- Removedfinalhandler@0.0.2(transitive)
- Removedgex@0.2.0(transitive)
- Removedhttp-errors@1.2.8(transitive)
- Removediconv-lite@0.4.24(transitive)
- Removedjsonic@0.2.0(transitive)
- Removedlru-cache@2.6.4(transitive)
- Removedms@0.6.20.7.02.0.0(transitive)
- Removedneedle@0.9.2(transitive)
- Removedon-headers@1.0.2(transitive)
- Removedparseurl@1.1.3(transitive)
- Removedpatrun@0.4.3(transitive)
- Removedqs@1.1.0(transitive)
- Removedreconnect-core@0.0.1(transitive)
- Removedreconnect-net@0.0.0(transitive)
- Removedsafer-buffer@2.1.2(transitive)
- Removedstatuses@1.5.0(transitive)
- Removedutils-merge@1.0.0(transitive)
Updatederaro@0.4.x
Updatedgex@0.2.x
Updatedjsonic@0.2.x
Updatedlodash@3.10.x
Updatedlru-cache@2.7.x
Updatedpatrun@0.5.x