Comparing version 0.4.1 to 0.12.0-alpha3
17
index.js
var argv = require('optimist').argv | ||
var _ = require('./lib/client/client') | ||
exports.Client = _.Client | ||
exports.Service = require('./lib/client/service').Service | ||
exports.Locator = require('./lib/client/locator').Locator | ||
exports.Worker = require('./lib/worker/worker').Worker | ||
@@ -15,2 +16,12 @@ | ||
return argv.uuid && argv.app && argv.locator && argv.endpoint | ||
} | ||
} | ||
exports.getWorkerAttrs = function(){ | ||
return { | ||
uuid: argv.uuid, | ||
app: argv.app, | ||
locator: argv.locator, | ||
endpoint: argv.endpoint | ||
} | ||
} | ||
@@ -9,4 +9,4 @@ | ||
var util = require('../util') | ||
var inspect = require("util").inspect | ||
var _mp = require('./mp') | ||
@@ -18,3 +18,3 @@ var unpackMessage = _mp.unpackMessage | ||
var trace = 0 | ||
var debug = require('../util').debug('co:channel') | ||
@@ -26,3 +26,3 @@ function notImplemented(){ | ||
function Channel(host, port){ | ||
trace && console.log('creating channel') | ||
debug('creating channel') | ||
this.owner = null | ||
@@ -43,2 +43,3 @@ this._socket = null | ||
this.connect() | ||
this._RPC = RPC | ||
} | ||
@@ -48,3 +49,3 @@ | ||
connect: function(){ | ||
trace && console.log('connecting channel') | ||
debug('connecting channel') | ||
__assert(!this._socket, '!this._socket') | ||
@@ -55,6 +56,6 @@ this._makeSocket() | ||
if(this._socketPath){ | ||
trace && console.log('connecting channel, path:', this._socketPath) | ||
debug('connecting channel, path:', this._socketPath) | ||
this._socket.connect(this._socketPath) | ||
} else if(this._host){ | ||
trace && console.log('connecting channel, [host,port]:', [this._host, this._port]) | ||
debug('connecting channel, [host,port]:', [this._host, this._port]) | ||
this._socket.connect(this._port, this._host) | ||
@@ -64,2 +65,6 @@ } | ||
_setProtocol: function(RPC){ | ||
this._RPC = RPC | ||
}, | ||
send: function(buf){ | ||
@@ -76,5 +81,5 @@ __assert(this._socket && Buffer.isBuffer(buf), 'this._socket && Buffer.isBuffer(buf)') | ||
"arguments.length === 3 && typeof sid === 'number' && typeof code === 'number' && typeof message === 'string'") | ||
this._socket.send(mp.pack([protocol.RPC.terminate, sid, [code, message]])) | ||
this._socket.send(mp.pack([sid, this._RPC.terminate, [code, message]])) | ||
}, | ||
close: function(){ | ||
@@ -105,10 +110,23 @@ if(this._socket){ | ||
// a hook method, needed for testing only. | ||
// don't touch! don't use! | ||
// it's only usage is in test/channel['on socket close just on write'] | ||
_injectSocket: function(sock){ | ||
__assert(!this._socket, '!this._socket') | ||
this._socket = sock | ||
sock.on('error', this._hdl.error) | ||
sock.on('data', this._hdl.data) | ||
sock.on('end', this._hdl.end) | ||
sock.on('close', this._hdl.close) | ||
}, | ||
_destroySocket: function(){ | ||
if(this._socket){ | ||
this._socket.removeListener('connect', this._hdl.connect) | ||
this._socket.removeListener('error', this._hdl.error) | ||
this._socket.removeListener('close', this._hdl.close) | ||
this._socket.removeListener('data', this._hdl.data) | ||
this._socket.destroy() | ||
var s = this._socket | ||
this._socket = null | ||
s.removeListener('connect', this._hdl.connect) | ||
s.removeListener('error', this._hdl.error) | ||
s.removeListener('close', this._hdl.close) | ||
s.removeListener('data', this._hdl.data) | ||
s.destroy() | ||
} | ||
@@ -126,12 +144,23 @@ }, | ||
connect: function(){ | ||
debug('channel.hdl.connect') | ||
this._socket.removeListener('connect', this._hdl.connect) | ||
this._socket.on('data', this._hdl.data) | ||
this._socket.on('end', this._hdl.end) | ||
this.on_connect() | ||
}, | ||
close: function(err){ | ||
end: function(){ | ||
debug('channel.hdl.end') | ||
if(!this._errorFired){ | ||
this._errorFired = true | ||
this.on_socket_error(errno.EBADF) | ||
this.on_socket_error(errno.ESHUTDOWN) | ||
} | ||
}, | ||
close: function(){ | ||
debug('channel.hdl.close') | ||
if(!this._errorFired){ | ||
this._errorFired = true | ||
this.on_socket_error(errno.EPIPE) | ||
} | ||
this._destroySocket() | ||
@@ -141,3 +170,4 @@ }, | ||
error: function(err){ | ||
var errno0 = errno[err.errno] | ||
debug('channel.hdl.error', err) | ||
var errno0 = errno[err.code] || errno.EBADF | ||
this._errorFired = true | ||
@@ -149,14 +179,35 @@ this.on_socket_error(errno0) | ||
message: function(m){ | ||
switch(m[0]){ | ||
debug('channel.hdl.message', m) | ||
if(!(typeof m === 'object' && m.length === 3)){ | ||
debug('RPC message is not a tuple', m) | ||
this._hdl.error({code: 'EBADMSG'}) | ||
return false | ||
} | ||
var sid = m[0], mid = m[1], args = m[2] | ||
if(!(typeof mid === 'number' | ||
&& typeof sid === 'number' | ||
&& (typeof args === 'object' && typeof args.length === 'number'))){ | ||
debug('bad RPC message tuple') | ||
this._hdl.error({code: 'EBADMSG'}) | ||
return false | ||
} | ||
switch(mid){ | ||
case RPC.heartbeat: { | ||
this.on_heartbeat() | ||
case this._RPC.heartbeat: { | ||
debug('case this._RPC.heartbeat') | ||
if(!(args.length === 0)){ | ||
debug('bad RPC.heartbeat message') | ||
} else { | ||
this.on_heartbeat() | ||
} | ||
break | ||
} | ||
case RPC.invoke: { | ||
var sid = m[1], event = m[2][0] | ||
if(!(typeof sid === 'number' && typeof event === 'string', | ||
"typeof sid === 'number' && typeof event === 'string'")){ | ||
trace && console.log('bad RPC.invoke message') | ||
case this._RPC.invoke: { | ||
debug('case this._RPC.invoke') | ||
var event = args[0] | ||
if(!(typeof event === 'string', | ||
"typeof event === 'string'")){ | ||
debug('bad RPC.invoke message') | ||
} else { | ||
@@ -168,7 +219,7 @@ this.on_invoke(sid, event) | ||
case RPC.chunk: { | ||
var sid = m[1], buf = m[2][0] | ||
if(!(typeof sid === 'number', Buffer.isBuffer(buf), | ||
"typeof sid === 'number', Buffer.isBuffer(buf)")){ | ||
trace && console.log('bad RPC.chunk message') | ||
case this._RPC.chunk: { | ||
debug('case this._RPC.chunk') | ||
var buf = args[0] | ||
if(!(Buffer.isBuffer(buf))){ | ||
debug('bad RPC.chunk message') | ||
} else { | ||
@@ -180,7 +231,5 @@ this.on_chunk(sid, buf) | ||
case RPC.choke: { | ||
var sid = m[1] | ||
if(!(typeof sid === 'number', | ||
"typeof sid === 'number'")){ | ||
trace && console.log('bad RPC.choke message') | ||
case this._RPC.choke: { | ||
if(!(args.length === 0)){ | ||
debug('bad RPC.choke message') | ||
} else { | ||
@@ -192,9 +241,8 @@ this.on_choke(sid) | ||
case RPC.error: { | ||
var errno = m[1], message = m[2][0] | ||
if(!(typeof errno === 'number', typeof message === 'string', | ||
"typeof errno === 'number', typeof message === 'string'")){ | ||
trace && console.log('bad RPC.error message', m) | ||
case this._RPC.error: { | ||
var errno0 = args[0], message = args[1] | ||
if(!(typeof errno0 === 'number' && typeof message === 'string')){ | ||
debug('bad RPC.error message', m) | ||
} else { | ||
this.on_error(errno, message) | ||
this.on_error(sid, errno0, message) | ||
} | ||
@@ -204,7 +252,6 @@ break | ||
case RPC.terminate: { | ||
var code = m[1], reason = m[2][0] | ||
if(!(typeof code === 'number', typeof message === 'string', | ||
"typeof code === 'number', typeof message === 'string'")){ | ||
trace && console.log('bad RPC.terminate message', m) | ||
case this._RPC.terminate: { | ||
var code = args[0], reason = args[1] | ||
if(!(typeof code === 'number' && typeof reason === 'string')){ | ||
debug('bad RPC.terminate message', m) | ||
} else { | ||
@@ -217,5 +264,6 @@ this.on_terminate(code, reason) | ||
default: { | ||
trace && console.log('discarding unknown message type', m) | ||
debug('discarding a message of unknown type', m) | ||
} | ||
} | ||
return true | ||
}, | ||
@@ -225,28 +273,43 @@ | ||
__assert(Buffer.isBuffer(buf), 'Buffer.isBuffer(buf)') | ||
trace && console.log('channel got data', buf) | ||
debug('channel got data', buf) | ||
if(this._inBuffer){ | ||
debug('previous data present') | ||
this._inBuffer = Buffer.concat([this._inBuffer, buf]) | ||
} else { | ||
debug('no previous data present') | ||
this._inBuffer = buf | ||
} | ||
var m | ||
while(m = unpackMessage(this._inBuffer)){ | ||
while(true){ | ||
m = unpackMessage(this._inBuffer, this._RPC) | ||
debug('unpacked message', inspect(m, {depth:null})) | ||
if(!m){ | ||
break | ||
} | ||
debug('channel got message', m) | ||
if(m === _mpFail){ | ||
trace && console.log('bad message framing') | ||
this.close() | ||
debug('bad message framing') | ||
//we close it just as if is of some unexpected form | ||
//this.close() | ||
} else { | ||
debug('unpackMessage.bytesParsed', unpackMessage.bytesParsed) | ||
var remaining = this._inBuffer.length - unpackMessage.bytesParsed | ||
debug('remaining', remaining) | ||
} | ||
// this is a bit cryptic. | ||
// what we do here is, if the message is of some very unexpected form, | ||
// don't do anything more. Error handling is done elsewhere, | ||
// here we have to just bail out of this handler | ||
if(!this._hdl.message(m)){ | ||
return | ||
} | ||
trace && console.log('worker got message', m) | ||
trace && console.log('unpackMessage.bytesParsed', remaining) | ||
var remaining = this._inBuffer.length - unpackMessage.bytesParsed | ||
trace && console.log('remaining', remaining) | ||
this._hdl.message(m) | ||
if(0 < remaining){ | ||
trace && console.log('remaining buffer', this._inBuffer.slice(unpackMessage.bytesParsed)) | ||
debug('remaining buffer', this._inBuffer.slice(unpackMessage.bytesParsed)) | ||
this._inBuffer = this._inBuffer.slice(this._inBuffer.length - remaining) | ||
@@ -253,0 +316,0 @@ } else { |
@@ -10,4 +10,7 @@ | ||
var trace = 0 | ||
var debug = require('../util').debug('co:mp') | ||
function unpackMessage(buf){ | ||
var inspect = require('util').inspect | ||
function unpackMessage(buf, _RPC){ | ||
if(buf.length === 0){ | ||
@@ -21,23 +24,32 @@ return | ||
var _RPC = _RPC || RPC | ||
var parsed = 1 | ||
var sessionId = parseInt(buf, parsed) | ||
debug('parser got sessionId', sessionId) | ||
if(sessionId === null){ | ||
// not enough bytes | ||
return null | ||
} else if(sessionId === fail){ | ||
return fail | ||
} | ||
parsed += parseInt.bytesParsed | ||
var method = parseInt(buf, parsed) | ||
trace && console.log('parser got method', method) | ||
debug('parser got method', method) | ||
if(method === null){ | ||
// not enough bytes | ||
return null | ||
} else if(sessionId === fail){ | ||
return fail | ||
} | ||
parsed += parseInt.bytesParsed | ||
if(method === RPC.chunk){ | ||
var sid = parseInt(buf, parsed) | ||
if(sid === fail){ | ||
return fail | ||
} | ||
if(method === _RPC.chunk){ | ||
parsed += parseInt.bytesParsed | ||
trace && console.log('parsed sid,len', sid, parseInt.bytesParsed) | ||
if(!buf[parsed] === 0x91){ | ||
trace && console.log('buf[parsed] === 0x91 '+buf.slice(parsed).toString()) | ||
debug('buf[parsed] === 0x91 '+buf.slice(parsed).toString()) | ||
return fail | ||
@@ -53,14 +65,14 @@ } | ||
} | ||
trace && console.log('before parseBuffer, parsed', parsed) | ||
debug('before parseBuffer, parsed', parsed) | ||
parsed += parseBuffer.bytesParsed | ||
trace && console.log('parseBuffer.bytesParsed', parseBuffer.bytesParsed) | ||
debug('parseBuffer.bytesParsed', parseBuffer.bytesParsed) | ||
unpackMessage.bytesParsed = parsed | ||
return [method, sid, [buf1]] | ||
return [sessionId, method, [buf1]] | ||
} else { | ||
var m = mp.unpack(buf) | ||
trace && console.log('mp.unpack.bytes_remaining', mp.unpack.bytes_remaining) | ||
trace && console.log('unpacked', m) | ||
debug('mp.unpack.bytes_remaining', mp.unpack.bytes_remaining) | ||
debug('unpacked', inspect(m, {depth: null})) | ||
if(m === null){ | ||
@@ -215,3 +227,3 @@ return m | ||
default: | ||
trace && console.log('parse int failed: '+hexy.hexy(b.slice(i))) | ||
debug('parse int failed: '+hexy.hexy(b.slice(i))) | ||
return fail | ||
@@ -234,6 +246,5 @@ } | ||
var len = b0 & (0x20-1) | ||
if(len0 < len+1+1){ | ||
if(len0 < len+1){ | ||
return null | ||
} | ||
trace && console.log('parsed buffer len', len) | ||
var val = b.slice(i+1,i+1+len) | ||
@@ -254,2 +265,3 @@ parseBuffer.bytesParsed = 1+len | ||
var len = (b[i+1]<<24) + (b[i+2]<<16) + (b[i+3]<<8) + (b[i+4]) | ||
debug('parseBuffer-raw32 len:', len) | ||
if(len0 < len+1+4){ | ||
@@ -263,3 +275,3 @@ return null | ||
default: | ||
trace && console.log('parse binary buffer failed: '+hexy.hexy(b.slice(o))) | ||
debug('parse binary buffer failed: '+hexy.hexy(b.slice(i))) | ||
return fail | ||
@@ -271,1 +283,3 @@ } | ||
module.exports.unpackMessage = unpackMessage | ||
module.exports.fail = fail | ||
var EventEmitter = require('events').EventEmitter | ||
var __assert = require('assert') | ||
var util = require('util') | ||
var makeError = require('../util').makeError | ||
var channel_binding = require('../channel/channel').Channel | ||
var _ = require('../errno'), ERRNO = _.errno, _ERRNO = _.code | ||
var FSM = require('../fsm') | ||
var Session = require('./session').Session | ||
var Channel = require('./channel').Channel | ||
var __stop = require('./graph').__stop | ||
var mp = require('msgpack') | ||
var util = require('../util') | ||
function BaseService(){ | ||
this.__sid = 1 | ||
this._sessions = {} | ||
this.__graph = null | ||
var makeError = util.makeError | ||
this._channel = null | ||
this._connecting = false | ||
// the invariant is: | ||
// (!this._channel && !this._connecting) | ||
// || (this._channel && this._connecting) | ||
// || (this._channel && !this._connecting) | ||
var self = this | ||
this._hdl = { | ||
message: function(){ | ||
self.hdl.message.apply(self, arguments) | ||
} | ||
} | ||
var trace = 0 | ||
} | ||
var BaseService = FSM.define({ | ||
methods:{ | ||
_send: function(message){ | ||
var buf = mp.pack(message) | ||
this._handle.send(buf) | ||
}, | ||
_setHandle: function(handle){ | ||
this._handle = handle | ||
handle.owner = this | ||
}, | ||
_closeHandle: function(){ | ||
this._handle.close() | ||
util.unsetHandlers(this._handle, this.__fsmdef[this._state].handlers) | ||
this._handle.owner = null | ||
this._handle = null | ||
}, | ||
removeSession: function(s){ | ||
var id = s._id | ||
delete this._sessions[id] | ||
}, | ||
_resetSessions: function(errno){ | ||
Object.keys(this._sessions).forEach(function(sid){ | ||
var s = this._sessions[sid] | ||
s.pushError(errno, _ERRNO[errno]) | ||
delete this._sessions[sid] | ||
},this) | ||
BaseService.prototype = { | ||
__proto__: EventEmitter.prototype, | ||
connect: function(endpoints){ | ||
// consecutively connect to endpoints | ||
if(this._channel){ | ||
console.log('this._channel', new Error().stack) | ||
} | ||
}, | ||
__assert(!this._channel) | ||
startState: 'closed', | ||
states:{ | ||
var self = this | ||
closed:{ | ||
invariant: function(){return this._handle === null && this._error === null}, | ||
methods:{ | ||
connect: function(endpoint){ | ||
trace && console.log('connecting to', endpoint) | ||
if(Array.isArray(endpoint)){ | ||
__assert(typeof endpoint[0] === 'string' && typeof endpoint[1] === 'number', "endpoint is ['host|ip', port]") | ||
try{ | ||
var channelHandle = new channel_binding(endpoint[0], endpoint[1]) | ||
} catch (e){ | ||
if(typeof e === 'number'){ | ||
e = new Error(_ERRNO[e]) | ||
} | ||
throw e | ||
} | ||
this._setHandle(channelHandle) | ||
} else { | ||
__assert(typeof endpoint === 'string', "assume endpoint is a string path to unix socket") | ||
try{ | ||
var channelHandle = new channel_binding(endpoint) | ||
} catch(e){ | ||
if(typeof e === 'number'){ | ||
e = new Error(_ERRNO[e]) | ||
} | ||
throw e | ||
} | ||
this._setHandle(channelHandle) | ||
} | ||
this._setState('connecting') | ||
} | ||
var i = 0 | ||
_nextEndpoint() | ||
function _nextEndpoint(){ | ||
if(i < endpoints.length){ | ||
var endpoint = endpoints[i++] | ||
self._channel = new Channel(endpoint[0], endpoint[1]) | ||
self._channel.owner = this | ||
self._channel.on_connect = _on_connect | ||
self._channel.on_socket_error = _on_connect_error | ||
//self._channel.connect(endpoint) | ||
} else { | ||
var err = new Error('can not connect to any of the endpoints: '+util.inspect(endpoints)) | ||
err.code = 'ECONNREFUSED' | ||
self.emit('error', err) | ||
} | ||
}, | ||
} | ||
function _on_connect(){ | ||
self._channel.on_connect = null | ||
self._channel.on_socket_error = _on_socket_error | ||
self._channel.on_message = self._hdl.message | ||
self.emit('connect') | ||
} | ||
connecting:{ | ||
invariant: function(){return this._handle !== null && this._error === null}, | ||
methods:{ | ||
close:function(){ | ||
this._closeHandle() | ||
this._setState('closed') | ||
} | ||
}, | ||
function _on_connect_error(){ | ||
var ch = self._channel | ||
self._channel = null | ||
ch.owner = null | ||
ch.on_connect = null | ||
ch.on_socket_error = null | ||
handlers:{ | ||
on_socket_error:function(errno){ | ||
var _this = this.owner | ||
_this._closeHandle() | ||
var e = makeError(errno) | ||
_this._error = e | ||
_this._setState('error') | ||
_this._resetSessions(errno) | ||
_this._emit('error', e)}, | ||
_nextEndpoint() | ||
} | ||
on_connect:function(){ | ||
var _this = this.owner | ||
_this._setState('connected') | ||
_this._emit('connect') | ||
} | ||
} | ||
}, | ||
function _on_socket_error(errno){ | ||
var ch = self._channel | ||
self._channel = null | ||
ch.owner = null | ||
ch.on_connect = null | ||
ch.on_socket_error = null | ||
var err = makeError(errno) | ||
self.resetSessions(err) | ||
self.emit('error', err) | ||
} | ||
connected:{ | ||
invariant: function(){return this._handle !== null && this._error === null}, | ||
methods:{ | ||
send:function(msg){ | ||
this._send(msg) | ||
}, | ||
}, | ||
close:function(){ | ||
this._closeHandle() | ||
this._setState('closed') | ||
this._resetSessions(ERRNO.ECONNRESET) | ||
hdl: { | ||
message: function(m){ | ||
var sid = m[0], mid = m[1], args = m[2] | ||
var s = this._sessions[sid] | ||
if(s){ | ||
s.push(m) | ||
} | ||
} | ||
}, | ||
close: function(err){ | ||
if(this._channel){ | ||
if(this._connecting){ | ||
// cancel connect | ||
var ch = this._channel | ||
this._channel = null | ||
ch.owner = null | ||
ch.close() | ||
} else { | ||
// or close connection and reset sessions | ||
var ch = this._channel | ||
this._channel = null | ||
ch.owner = null | ||
ch.close() | ||
if(!err){ | ||
var err = new Error('connection closed by application') | ||
err.code = 'ECONNRESET' | ||
} | ||
}, | ||
this.resetSessions(err) | ||
} | ||
} | ||
}, | ||
handlers:{ | ||
on_socket_error:function(errno){ | ||
var _this = this.owner | ||
trace && console.log('socket error <%s>', _this._name, _ERRNO[errno]) | ||
var e = makeError(errno) | ||
_this._closeHandle() | ||
_this._setState('closed') | ||
_this._resetSessions(errno) | ||
_this._emit('error', e) | ||
}, | ||
_setGraph: function(_graph){ | ||
var graph_ = {} | ||
for(var k in _graph){ | ||
var idx = parseInt(k) | ||
var methodGraph = _graph[k] | ||
var methodName = methodGraph[0] | ||
var txGraph = methodGraph[1] | ||
var rxGraph = methodGraph[2] | ||
graph_[methodName] = [idx, txGraph, rxGraph] | ||
} | ||
this.__graph = graph_ | ||
}, | ||
on_chunk: function(sid, chunk){ | ||
var _this = this.owner | ||
var s = _this._sessions[sid] | ||
if(s){ | ||
s.pushChunk(chunk) | ||
} | ||
}, | ||
_send: function(m){ | ||
this._channel.send(mp.pack(m)) | ||
}, | ||
on_choke: function(sid){ | ||
trace && console.log('choke',sid) | ||
var _this = this.owner | ||
var s = _this._sessions[sid] | ||
if(s){ | ||
delete _this._sessions[sid] | ||
s.pushChoke() | ||
} | ||
}, | ||
_call: function(methodName, args){ | ||
on_error: function(sid, code, message){ | ||
trace && console.log('error',sid, code, message) | ||
var _this = this.owner | ||
var s = _this._sessions[sid] | ||
if(s){ | ||
delete _this._sessions[sid] | ||
s.pushError(code, message) | ||
} | ||
} | ||
} | ||
}, | ||
error:{ | ||
methods:{ | ||
invariant: function(){return this._handle === null && this._error === null}, | ||
close:function(){ | ||
this._error = null | ||
this._setState('closed') | ||
} | ||
} | ||
__assert(methodName in this.__graph && typeof args === 'object' && typeof args.length === 'number') | ||
var methodDef = this.__graph[methodName] | ||
var mid = methodDef[0] | ||
var txGraph = methodDef[1] | ||
var rxGraph = methodDef[2] | ||
var s = new Session(this.__sid++, txGraph, rxGraph) | ||
s._owner = this | ||
this._send([s._id, mid, args]) | ||
if(Object.keys(rxGraph).length !== 0){ | ||
this._sessions[s._id] = s | ||
} | ||
} | ||
}) | ||
return s | ||
}, | ||
clearSession: function(id){ | ||
if(id in this._sessions){ | ||
delete this._sessions[id] | ||
} | ||
}, | ||
resetSessions: function(err){ | ||
var ss = this._sessions | ||
this._sessions = {} | ||
module.exports = { | ||
BaseService: BaseService | ||
for(var id in ss){ | ||
ss[id].error(err) | ||
} | ||
} | ||
} | ||
module.exports.BaseService = BaseService |
var trace = 0 | ||
var __assert = require('assert') | ||
@@ -8,24 +6,25 @@ var EventEmitter = require('events').EventEmitter | ||
var debug = require('../util').debug('co:logger') | ||
var Service = require('./service').Service | ||
var VL = {ignore:0, error:1, warn: 2, info: 3, debug: 4} | ||
// logging priorities | ||
var LP = { debug: 0, info: 1, warning: 2, error: 3 } | ||
var slice = Array.prototype.slice | ||
function logMethod(level){ | ||
function logMethod(priority){ | ||
return function(){ | ||
if(level <= this._verbosity){ | ||
if(typeof arguments[arguments.length-1] === 'function'){ | ||
// attrs, format, args | ||
debug('call method with priority', priority, this._verbosity) | ||
if(this._verbosity <= priority){ | ||
// method priority greater than logger verbosity | ||
if(typeof arguments[0] === 'object'){ | ||
var args = slice.call(arguments) | ||
var cb = args.pop() | ||
var attrs = args.shift() | ||
var message = format.apply(null, args) | ||
this._logging.emit(level, this._target, message, cb) | ||
this._logging.emit(priority, this._target, message, attrs) | ||
} else { | ||
var args = arguments | ||
var message = format.apply(null, args) | ||
this._logging.emit(level, this._target, message) | ||
var message = format.apply(null, arguments) | ||
this._logging.emit(priority, this._target, message) | ||
} | ||
if(VL.debug <= level){ | ||
console.log(message) | ||
} | ||
} | ||
@@ -39,5 +38,5 @@ } | ||
this._client = null | ||
this._verbosity = LP.warning | ||
this._state = 'closed' | ||
this.setAppName(app) | ||
this._verbosity = VL.warn | ||
this._state = 'closed' | ||
} | ||
@@ -49,5 +48,6 @@ | ||
var _this = this | ||
if(this._state === 'closed'){ | ||
this._state = 'connecting' | ||
this._logging = new (Service('logging'))() | ||
this._logging = Service('logging') | ||
if(this._client){ | ||
@@ -57,3 +57,3 @@ this._logging._client = this._client | ||
this._logging.connect() | ||
trace && console.log('Logger: connecting to service logging') | ||
debug('connecting to service logging') | ||
this._logging.once('connect', _getVerbosity) | ||
@@ -65,3 +65,3 @@ this._logging.once('error', _handleConnectError) | ||
__assert(_this._state === 'connecting') | ||
console.log('_handleConnectError') | ||
debug('_handleConnectError') | ||
_this._logging.removeListener('connect', _getVerbosity) | ||
@@ -77,25 +77,29 @@ if(_this._logging._state !== 'closed'){ | ||
__assert(_this._state === 'connecting', format("_this._state === 'connecting'; state %s",_this._state)) | ||
trace && console.log('Logger: connected to service logging') | ||
debug('connected to service logging') | ||
_this._logging.removeListener('error', _handleConnectError) | ||
_this._logging.on('error', _handleSocketError) | ||
_this._logging.verbosity(_verbosityDone) | ||
} | ||
function _verbosityDone(err, verbosity){ | ||
__assert(_this._state !== 'connected', format('state %s',_this._state)) | ||
if(_this._state === 'connecting'){ | ||
if(err) { | ||
_this._logging.removeListener('error', _handleSocketError) | ||
_this.close() | ||
return _this.emit('error', err) | ||
_this._logging.verbosity().recv({ | ||
value: function(verbosity){ | ||
debug('got verbosity', verbosity) | ||
if(_this._state === 'connecting'){ | ||
_this._verbosity = verbosity | ||
_this._state = 'connected' | ||
_this.emit('connect') | ||
} | ||
}, | ||
error: function(code, reason){ | ||
if(_this._state === 'connecting'){ | ||
var err = new Error(reason) | ||
err.code = code | ||
_this._logging.removeListener('error', _handleSocketError) | ||
_this.emit('error', err) | ||
} | ||
} | ||
_this._verbosity = verbosity | ||
_this._state = 'connected' | ||
_this.emit('connect') | ||
} | ||
}) | ||
} | ||
function _handleSocketError(err){ | ||
trace && console.log('handleSocketError') | ||
__assert(_this._state === 'connecting' || _this._state === 'connected') | ||
debug('handleSocketError') | ||
__assert(_this._state === 'connecting' || _this._state === 'connected', | ||
"_this._state === 'connecting' || _this._state === 'connected'") | ||
_this.close() | ||
@@ -118,6 +122,6 @@ _this.emit('error', err) | ||
error: logMethod(VL.error), | ||
warn: logMethod(VL.warn), | ||
info: logMethod(VL.info), | ||
debug: logMethod(VL.debug) | ||
error: logMethod(LP.error), | ||
warning: logMethod(LP.warning), | ||
info: logMethod(LP.info), | ||
debug: logMethod(LP.debug) | ||
@@ -124,0 +128,0 @@ } |
@@ -15,3 +15,3 @@ | ||
function Session(id, cb){ | ||
function Session(id, cb, methodId, methodName){ | ||
var _this = this | ||
@@ -21,3 +21,4 @@ this._cb = cb | ||
this._owner = null | ||
this._methodId = undefined | ||
this._methodId = methodId | ||
this._methodName = methodName | ||
this._args = null | ||
@@ -32,6 +33,2 @@ this._done = false | ||
_this._done = true | ||
if(_this._owner){ | ||
_this._owner.removeSession(_this) | ||
_this._owner = null | ||
} | ||
var err = new Error('call timeout') | ||
@@ -76,3 +73,3 @@ _this._cb(err) | ||
if(typeof this._cb === 'function'){ | ||
var err = new Error(util.format('%s\n in service %s method %s\n args: %s\n', message, owner._name, this._methodId, util.inspect(this._args))) | ||
var err = new Error(util.format('<Service %s>.%s error: "%s" args: %s', owner._name, this._methodName, message, util.inspect(this._args))) | ||
err.code = code | ||
@@ -85,5 +82,5 @@ this._cb(err) | ||
module.exports.unpackWith = function(unpacker){ | ||
return function(mid){ | ||
return function(methodId, methodName){ | ||
return function(){ | ||
trace && console.log('================ unpackWith, calling method %s', mid) | ||
trace && console.log('================ unpackWith, calling method [%s]<%s>', methodId, methodName) | ||
__assert(this._state === 'connected') | ||
@@ -96,11 +93,10 @@ var args = slice.call(arguments) | ||
args.pop() | ||
var S = new Session(sid, cb) | ||
var S = new Session(sid, cb, methodId, methodName) | ||
S._unpacker = unpacker | ||
S._owner = this | ||
S._args = args | ||
S._methodId = mid | ||
this._sessions[sid] = S | ||
} | ||
} | ||
this.send([mid, sid, args]) | ||
this.send([methodId, sid, args]) | ||
} | ||
@@ -110,5 +106,5 @@ } | ||
module.exports.unpacking = function(mid){ | ||
module.exports.unpacking = function(methodId, methodName){ | ||
return function(){ | ||
trace && console.log('================ unpacking, calling method %s', mid) | ||
trace && console.log('================ unpackWith, calling method [%s]<%s>', methodId, methodName) | ||
__assert(this._state === 'connected') | ||
@@ -121,10 +117,9 @@ var args = slice.call(arguments) | ||
args.pop() | ||
var S = new Session(sid, cb) | ||
var S = new Session(sid, cb, methodId, methodName) | ||
S._owner = this | ||
S._args = args | ||
S._methodId = mid | ||
this._sessions[sid] = S | ||
} | ||
} | ||
this.send([mid, sid, args]) | ||
this.send([methodId, sid, args]) | ||
} | ||
@@ -131,0 +126,0 @@ } |
@@ -17,3 +17,3 @@ | ||
function Session(id, fiber){ | ||
function Session(id, fiber, methodId, methodName){ | ||
__assert(typeof id === 'number', 'no id passed to session') | ||
@@ -24,3 +24,4 @@ __assert(fiber, 'no fiber passed to session') | ||
this._owner = null | ||
this._methodId = undefined | ||
this._methodId = methodId | ||
this._methodName = methodName | ||
this._args = null | ||
@@ -35,6 +36,2 @@ this._done = false | ||
_this._done = true | ||
if(_this._owner){ | ||
_this._owner.removeSession(_this) | ||
_this._owner = null | ||
} | ||
var err = new Error('call timeout') | ||
@@ -71,3 +68,3 @@ _this.fiber.throwInto(err) | ||
clearTimeout(this._callTimer) | ||
var err = new Error(util.format('%s\n in service %s method %s\n args: %s\n', message, owner._name, this._methodId, util.inspect(this._args))) | ||
var err = new Error(util.format('<Service %s>.%s error: "%s" args: %s', owner._name, this._methodName, message, util.inspect(this._args))) | ||
err.code = code | ||
@@ -80,13 +77,12 @@ this.fiber.thowInto(err) | ||
unpackWith: function(unpacker){ | ||
return function(mid){ | ||
return function(methodId, methodName){ | ||
return function(){ | ||
var args = slice.call(arguments) | ||
var sid = this.__sid++ | ||
var S = new Session(sid, Fiber.current) | ||
var S = new Session(sid, Fiber.current, methodId, methodName) | ||
S._unpacker = unpacker | ||
S._owner = this | ||
S._methodId = mid | ||
S._args = args | ||
this._sessions[S._id] = S | ||
this._send([mid, S._id, args]) | ||
this._send([methodId, S._id, args]) | ||
return Fiber.yield() | ||
@@ -96,12 +92,11 @@ } | ||
}, | ||
unpacking: function(mid){ | ||
unpacking: function(methodId, methodName){ | ||
return function(){ | ||
var args = slice.call(arguments) | ||
var sid = this.__sid++ | ||
var S = new Session(sid, Fiber.current) | ||
var S = new Session(sid, Fiber.current, methodId, methodName) | ||
S._owner = this | ||
S._methodId = mid | ||
S._args = args | ||
this._sessions[S._id] = S | ||
this._send([mid, S._id, args]) | ||
this._send([methodId, S._id, args]) | ||
return Fiber.yield() | ||
@@ -108,0 +103,0 @@ } |
@@ -17,3 +17,3 @@ | ||
function Session(id, deferred){ | ||
function Session(id, deferred, methodId, methodName){ | ||
__assert(typeof id === 'number', 'no id passed to session') | ||
@@ -29,3 +29,4 @@ __assert(deferred, 'no deferred passed to session') | ||
this._owner = null | ||
this._methodId = undefined | ||
this._methodId = methodId | ||
this._methodName = methodName | ||
var _this = this | ||
@@ -35,6 +36,2 @@ this._timeoutHandler = function(){ | ||
_this._done = true | ||
if(_this._owner){ | ||
_this._owner.removeSession(_this) | ||
_this._owner = null | ||
} | ||
var err = new Error('call timeout') | ||
@@ -71,3 +68,3 @@ Promise.reject(_this.deferred, err) | ||
clearTimeout(this._callTimer) | ||
var err = new Error(util.format('%s\n in service %s method %s\n args: %s\n', message, owner._name, this._methodId, util.inspect(this._args))) | ||
var err = new Error(util.format('<Service %s>.%s error: "%s" args: %s', owner._name, this._methodName, message, util.inspect(this._args))) | ||
err.code = code | ||
@@ -80,15 +77,14 @@ Promise.reject(this.deferred, err) | ||
unpackWith: function(unpacker){ | ||
return function(mid){ | ||
return function(methodId, methodName){ | ||
return function(){ | ||
trace && console.log('================ unpackWith, calling method %s of service %s', mid, this._name) | ||
trace && console.log('================ unpackWith, calling method %s of service %s', methodId, this._name) | ||
var args = slice.call(arguments) | ||
var d = Promise.defer() | ||
var sid = this.__sid++ | ||
var S = new Session(sid, d) | ||
var S = new Session(sid, d, methodId, methodName) | ||
S._unpacker = unpacker | ||
S._args = args | ||
S._owner = this | ||
S._methodId = mid | ||
this._sessions[S._id] = S | ||
this.send([mid, S._id, args]) | ||
this.send([methodId, S._id, args]) | ||
return Promise.promise(S.deferred) | ||
@@ -98,14 +94,13 @@ } | ||
}, | ||
unpacking: function(mid){ | ||
unpacking: function(methodId, methodName){ | ||
return function(){ | ||
trace && console.log('================ unpackWith, calling method %s of service %s', mid, this._name) | ||
trace && console.log('================ unpackWith, calling method %s of service %s', methodId, this._name) | ||
var args = slice.call(arguments) | ||
var d = Promise.defer() | ||
var sid = this.__sid++ | ||
var S = new Session(sid, d) | ||
var S = new Session(sid, d, methodId, methodName) | ||
S._args = args | ||
S._owner = this | ||
S._methodId = mid | ||
this._sessions[S._id] = S | ||
var msg = [mid, S._id, args] | ||
var msg = [methodId, S._id, args] | ||
this.send(msg) | ||
@@ -112,0 +107,0 @@ return Promise.promise(S.deferred) |
@@ -16,15 +16,13 @@ | ||
function ClientSession(id){ | ||
function ClientSession(sid, methodId, methodName){ | ||
Session.apply(this) | ||
var _this = this | ||
this._id = id | ||
this._id = sid | ||
this._done = false | ||
this._callTimer = 0 | ||
this._methodId = methodId | ||
this._methodName = methodName | ||
this._timeoutHandler = function(){ | ||
__assert(!_this._done) | ||
_this._done = true | ||
if(_this._owner){ | ||
_this._owner.removeSession(_this) | ||
_this._owner = null | ||
} | ||
var err = new Error('call timeout') | ||
@@ -45,14 +43,28 @@ } | ||
this.push(chunk) | ||
}, | ||
pushChoke: function(){ | ||
var r = Session.apply(this, arguments) | ||
this._owner = null | ||
return r | ||
}, | ||
pushError: function(code, message){ | ||
var owner = this._owner | ||
this._owner = null | ||
var err = new Error(util.format('<Service %s>.%s error: "%s" args: %s', owner._name, this._methodName, message, util.inspect(this._args))) | ||
err.code = code | ||
this.emit('error', err) | ||
this.close() | ||
} | ||
} | ||
module.exports = function(mid){ | ||
module.exports = function(methodId, methodName){ | ||
return function(){ | ||
trace && console.log('================ calling method %s', mid) | ||
trace && console.log('================ calling method %s, id %s', methodName, methodId) | ||
__assert(this._state === 'connected') | ||
var args = slice.call(arguments) | ||
var sid = this.__sid++ | ||
var S = new ClientSession(sid) | ||
var S = new ClientSession(sid, methodId, methodName) | ||
S._owner = this | ||
this._sessions[S._id] = S | ||
this.send([mid, sid, args]) | ||
this.send([methodId, sid, args]) | ||
return S | ||
@@ -59,0 +71,0 @@ } |
exports.makeService = makeService | ||
exports.bakeServiceProto = bakeServiceProto, | ||
exports.Service = Service | ||
exports.serviceConnect = serviceConnect | ||
var PROTOCOL_VERSION = 1 | ||
var __assert = require('assert') | ||
var isIP = require('net').isIP | ||
var dns = require('dns') | ||
var EventEmitter = require('events').EventEmitter | ||
var util = require('util') | ||
var Locator = require('./locator').Locator | ||
var BaseService = require('./base_service').BaseService | ||
var debug = require('../util').debug('co:client:service') | ||
var methods = require('./methods/callback') | ||
var Client = require('./client').Client | ||
var client // = module.exports.client = new Client() // can't do this because | ||
// of cyclic dependency client->service->client, so it's done in | ||
// _resolve of serviceConnect routine | ||
var trace = 0 | ||
var slice = Array.prototype.slice | ||
function makeService(name, result, def){ | ||
var proto = bakeServiceProto(name, result, def) | ||
proto.__proto__ = BaseService.prototype, | ||
Client.prototype = proto | ||
return Client | ||
function Client(){ | ||
BaseService.apply(this, arguments) | ||
this._sessions = {} | ||
function resolve(name, locator, cb){ | ||
__assert(typeof name === 'string' && (arguments.length === 2 || arguments.length === 3)) | ||
if(arguments.length === 2){ | ||
cb = locator | ||
locator = undefined | ||
} | ||
__assert(typeof cb === 'function') | ||
} | ||
var L = locator || new Locator() | ||
var done = false | ||
function loadServiceDefinition(name, methods){ | ||
trace && console.log('loadServiceDefinition', slice.call(arguments)) | ||
__assert(typeof name === 'string', "typeof name === 'string'") | ||
var modName = './services/'+name | ||
try { | ||
var def = require(modName) | ||
trace && console.log('loaded service definition', name, def) | ||
if(typeof def === 'function'){ | ||
return def(methods) | ||
} else { | ||
return def | ||
L.connect() | ||
L.once('connect', _onConnect) | ||
function _onConnect(){ | ||
L.resolve(name, function(err, endpoints, version, graph){ | ||
L.removeListener('error', _onError) | ||
L.close() | ||
if(!done){ | ||
done = true | ||
if(err){ | ||
cb(err) | ||
} else { | ||
cb(null, endpoints, version, graph) | ||
} | ||
} | ||
}) | ||
} | ||
L.on('error', _onError) | ||
function _onError(err){ | ||
L.removeListener('error', _onError) | ||
L.removeListener('connect', _onConnect) | ||
L.close() | ||
if(!done){ | ||
cb(err) | ||
} | ||
} catch (e){ | ||
if(e.code === 'MODULE_NOT_FOUND'){ | ||
trace && console.log('module '+modName+' not found') | ||
return {defaultMethod: methods.unpacking} | ||
} else { | ||
throw e | ||
} | ||
} | ||
} | ||
function bakeServiceProto(name, result, def, proto){ | ||
function bakeMethods(service, graph){ | ||
var __methods = service.__methods | ||
trace && console.log('>>> bakeServiceProto:', arguments) | ||
Object.keys(graph).some(function(k){ | ||
var methodName = graph[k][0] | ||
__methods[methodName] = function(){ | ||
var args = slice.apply(arguments) | ||
return this._service._call(methodName, args) | ||
} | ||
}) | ||
var endpoint = result[0] | ||
var protocolVer = result[1] | ||
var methods = result[2] | ||
__assert(protocolVer === PROTOCOL_VERSION, | ||
'protocolVer === PROTOCOL_VERSION') | ||
proto = proto || {} | ||
proto._name = name | ||
proto._endpoint = endpoint | ||
for(var mid in methods){ | ||
var _ | ||
var methodName = methods[mid] | ||
mid = parseInt(mid) | ||
var M = ((_ = def.methods) && _[methodName]) || def.defaultMethod | ||
proto[methodName] = M(mid) | ||
} | ||
proto.__svcproto && (proto.__svcproto = undefined) | ||
return proto | ||
} | ||
function Service(name){ | ||
function Service(name, def, methods0){ | ||
trace && console.log('Service', slice.call(arguments)) | ||
if(typeof def === 'string'){ | ||
trace && console.log('definition: string', def) | ||
def = loadServiceDefinition(def, methods0 || methods) | ||
} else if(typeof def === 'undefined'){ | ||
trace && console.log('definition: undefined') | ||
def = loadServiceDefinition(name, methods0 || methods) | ||
} else { | ||
trace && console.log('definition: object', def) | ||
__assert(typeof def === 'object', "typeof def === 'object'") | ||
//def = loadServiceDefinition(name, methods0 || methods) | ||
function ServiceClient(){ | ||
this._name = name | ||
this._service = new BaseService() | ||
this._onSocketError = this._onSocketError.bind(this) | ||
this._connected = false | ||
this._connecting = false | ||
} | ||
function Service(){ | ||
BaseService.apply(this, arguments) | ||
this._sessions = {} | ||
this._lookingup = false | ||
this._client = null | ||
this.__definition = def | ||
var methods = ServiceClient.prototype = { | ||
__proto__: serviceClientPrototype, | ||
__methods: null | ||
} | ||
var proto = Service.prototype = { | ||
__proto__: BaseService.prototype, | ||
__svcproto: undefined, | ||
_name: name, | ||
connect: serviceConnect | ||
} | ||
methods.__methods = methods | ||
proto.__svcproto = proto | ||
return new ServiceClient() | ||
return Service | ||
} | ||
var baseServiceConnect = BaseService.prototype.connect | ||
function serviceConnect(){ | ||
var _this = this | ||
var done = false | ||
var serviceClientPrototype = { | ||
__proto__: EventEmitter.prototype, | ||
_onSocketError: function(err){ | ||
debug('on socket error', err) | ||
__assert(this._connected) | ||
this._connected = false | ||
this._service.close() | ||
this._emit('error', err) | ||
}, | ||
if(this._lookingup) return | ||
this._lookingup = true | ||
if(!this._endpoint){ | ||
_resolve() | ||
} else { | ||
_checkIP() | ||
} | ||
_emit: EventEmitter.prototype.emit, | ||
function _resolve(){ | ||
trace && console.log('_resolve', _this._name, _this._endpoint) | ||
if(!_this._client && client === undefined){ | ||
client = new Client() | ||
} | ||
(_this._client || client).resolve(_this._name, _resolveDone) | ||
} | ||
connect: function(){ | ||
if(!this._connecting && !this._connected){ | ||
this._connecting = true | ||
var self = this | ||
resolve(this._name, function(err, endpoints, version, graph){ | ||
if(err){ | ||
self._emit('error', err) | ||
} else { | ||
self.__graph = graph | ||
self._service._setGraph(graph) | ||
bakeMethods(self, graph) | ||
function _resolveDone(err, result){ | ||
trace && console.log('_resolveDone', arguments) | ||
trace && console.log('svcproto',_this.__svcproto) | ||
if(err) return _handleError(err) | ||
var p = bakeServiceProto(_this._name, result, | ||
_this.__definition, | ||
_this.__svcproto || _this.__proto__) | ||
trace && console.log('baked proto', p) | ||
_checkIP() | ||
} | ||
self._setEndpoints(endpoints) | ||
self._service.connect(self._endpoints) | ||
self._service.on('connect', _onConnect) | ||
self._service.on('error', _onConnectError) | ||
function _checkIP(){ | ||
trace && console.log('_checkIP', _this._endpoint) | ||
if(Array.isArray(_this._endpoint) && !isIP(_this._endpoint[0])){ | ||
trace && console.log('not ip:', _this._endpoint[0]) | ||
dns.lookup(_this._endpoint[0], _lookupDone) | ||
} else { | ||
_connect() | ||
function _onConnect(){ | ||
self._service.removeListener('connect', _onConnect) | ||
self._service.removeListener('error', _onConnectError) | ||
self._service.on('error', self._onSocketError) | ||
self._connecting = false | ||
self._connected = true | ||
self._emit('connect') | ||
} | ||
function _onConnectError(err){ | ||
self._service.removeListener('connect', _onConnect) | ||
self._service.removeListener('error', _onConnectError) | ||
self._service.close() | ||
self._connecting = false | ||
self._emit('error', err) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
}, | ||
function _lookupDone(err, address, family){ | ||
trace && console.log('_lookupDone', arguments) | ||
if(err) return _handleError(err) | ||
_this._endpoint[0] = address | ||
_connect() | ||
} | ||
function _handleError(err){ | ||
trace && console.log('_handleError', arguments) | ||
if(!done){ | ||
done = true | ||
this._lookingup = false | ||
_this.emit('error', err) | ||
close: function(){ | ||
if(this._connected){ | ||
this._connected = false | ||
this._service.close() | ||
} else { | ||
trace && console.log('_handleError called after done', arguments) | ||
debug('not connected') | ||
} | ||
}, | ||
_setEndpoints: function(endpoints){ | ||
this._endpoints = endpoints | ||
} | ||
function _connect(){ | ||
trace && console.log('_connect') | ||
if(!done){ | ||
done = true | ||
_this._lookingup = false | ||
baseServiceConnect.call(_this, _this._endpoint) | ||
} else { | ||
trace && console.log('_connect called after done') | ||
} | ||
} | ||
} | ||
module.exports.Service = Service | ||
var __assert = require('assert') | ||
var format = require('util').format | ||
var EventEmitter = require('events').EventEmitter | ||
var ErrorEmitter = require('./error_emitter').ErrorEmitter | ||
@@ -12,3 +12,3 @@ var util = require('./util') | ||
function FSM(){ | ||
EventEmitter.apply(this, arguments) | ||
ErrorEmitter.apply(this, arguments) | ||
def.methods.__init__ && def.methods.__init__.apply(this, arguments) | ||
@@ -18,3 +18,3 @@ this._state = def.startState || 'start' | ||
this._error = null | ||
this.__sid = 0 | ||
this.__sid = 1 | ||
} | ||
@@ -25,3 +25,4 @@ | ||
__fsmdef: def.states, | ||
_emit: EventEmitter.prototype.emit | ||
_emit: ErrorEmitter.prototype.emit, | ||
_beforeError: ErrorEmitter.prototype.beforeError | ||
} | ||
@@ -32,2 +33,6 @@ | ||
} | ||
if(def.errorHandlers){ | ||
proto.errorHandlers = def.errorHandlers | ||
} | ||
@@ -75,4 +80,10 @@ for(var methodName in def.methods){ | ||
var FSMPrototype = { | ||
__proto__: EventEmitter.prototype, | ||
__proto__: ErrorEmitter.prototype, | ||
_makeError: function(errno, message){ | ||
var e = util.makeError(errno, message) | ||
e.state = this._state | ||
return e | ||
}, | ||
_setState: function(state){ | ||
@@ -79,0 +90,0 @@ trace && console.log('state: %s->%s', this._state, state) |
@@ -8,3 +8,2 @@ | ||
var RPC = protocol.RPC | ||
var _RPC = protocol._RPC | ||
@@ -20,2 +19,3 @@ var dbg = 0 | ||
this.choked = false | ||
this._RPC = RPC | ||
} | ||
@@ -26,2 +26,7 @@ | ||
__proto__:Stream.prototype, | ||
_setProtocol: function(RPC){ | ||
this._RPC = RPC | ||
}, | ||
pause:function(){ | ||
@@ -82,6 +87,6 @@ this.paused = true | ||
if(Buffer.isBuffer(data)){ | ||
var msg = mp.pack([_RPC.chunk,this._id,[data]]) | ||
var msg = mp.pack([this._id,this._RPC.chunk,[data]]) | ||
} else { | ||
__assert(typeof data === 'string') | ||
var msg = mp.pack([_RPC.chunk,this._id,[Buffer(data)]]) | ||
var msg = mp.pack([this._id,this._RPC.chunk,[Buffer(data)]]) | ||
} | ||
@@ -98,3 +103,3 @@ var hdl = this.owner._handle | ||
var hdl = this.owner._handle | ||
hdl.send(mp.pack([_RPC.choke,this._id,[]])) | ||
hdl.send(mp.pack([this._id,this._RPC.choke,[]])) | ||
this.close() | ||
@@ -106,3 +111,3 @@ }, | ||
typeof message === 'string') | ||
hdl.send(mp.pack([_RPC.error,this._id,[code,message]])) | ||
hdl.send(mp.pack([this._id,this._RPC.error,[code,message]])) | ||
this.close() | ||
@@ -109,0 +114,0 @@ }, |
@@ -17,2 +17,3 @@ | ||
this._hdl = {} | ||
this._RPC = RPC | ||
Duplex.call(this) | ||
@@ -24,3 +25,6 @@ util.bindHandlers(this.hdl,this._hdl,this) | ||
__proto__:Duplex.prototype, | ||
_setProtocol: function(RPC){ | ||
this._RPC = RPC | ||
}, | ||
// using .push() provided by Duplex | ||
@@ -50,7 +54,7 @@ | ||
if(Buffer.isBuffer(chunk)){ | ||
var msg = mp.pack([RPC.chunk,this._id,[chunk]]) | ||
var msg = mp.pack([this._id,this._RPC.chunk,[chunk]]) | ||
} else { | ||
__assert(typeof chunk === 'string' | ||
&& typeof encoding === 'string') | ||
var msg = mp.pack([RPC.chunk,this._id,[new Buffer(chunk,encoding)]]) | ||
var msg = mp.pack([this._id,this._RPC.chunk,[new Buffer(chunk,encoding)]]) | ||
} | ||
@@ -62,3 +66,3 @@ this.owner._handle.send(msg) | ||
var r = Duplex.prototype.end.apply(this,arguments) | ||
this.owner._handle.send(mp.pack([RPC.choke,this._id,[]])) | ||
this.owner._handle.send(mp.pack([this._id,this._RPC.choke,[]])) | ||
return r | ||
@@ -69,3 +73,3 @@ }, | ||
var hdl = this.owner._handle | ||
hdl.send(mp.pack([RPC.error,this._id,[code,message]])) | ||
hdl.send(mp.pack([this._id,this._RPC.error,[code,message]])) | ||
this.close() | ||
@@ -72,0 +76,0 @@ }, |
var __assert = require('assert') | ||
var _ERRNO = require('./errno').code | ||
var ERRNO = require('./errno').errno | ||
var format = require('util').format | ||
@@ -7,3 +11,4 @@ | ||
makeError: function(errno, message){ | ||
var e = new Error(message || '') | ||
message = message || _ERRNO[errno] || 'unknown' | ||
var e = new Error(message) | ||
e.code = _ERRNO[errno] | ||
@@ -28,4 +33,65 @@ return e | ||
} | ||
}, | ||
debug:function(subsys){ | ||
var re = new RegExp(subsys) | ||
if(process.env.NODE_DEBUG && re.test(process.env.NODE_DEBUG)){ | ||
return function(){ | ||
var entry = format.apply(null, arguments) | ||
console.log('%s:', subsys, entry) | ||
} | ||
} else { | ||
return function(){} | ||
} | ||
}, | ||
// link in proto chain `.prop` subobjects in | ||
// `self` and all it's proto-predecessors | ||
linkProtoProps: function(self, prop){ | ||
__assert(typeof self === 'object' && typeof prop === 'string', | ||
"typeof self === 'object' && typeof prop === 'string'") | ||
var p0, p1 | ||
p0 = self | ||
// find first proto with prop | ||
while(!p0.hasOwnProperty(prop)){ | ||
// we need to go deeper: | ||
p0 = p0.__proto__ | ||
if(p0 === null){ | ||
return | ||
} | ||
} | ||
propVal0 = p0[prop] | ||
if(typeof propVal0 !== 'object' || propVal0 === null){ | ||
return | ||
} | ||
p1 = p0.__proto__ | ||
while(p1 !== null){ | ||
if(p1.hasOwnProperty(prop)){ | ||
var propVal1 = p1[prop] | ||
if(typeof propVal1 === 'object' && propVal1 !== null){ | ||
// link | ||
propVal0.__proto__ = propVal1 | ||
// advance state | ||
p0 = p1 | ||
propVal0 = propVal1 | ||
// go deeper | ||
p1 = p0.__proto__ | ||
} else { | ||
break | ||
} | ||
} else { | ||
p1 = p1.__proto__ | ||
} | ||
} | ||
} | ||
} | ||
@@ -242,3 +242,3 @@ | ||
this._worker._handle.send(mp.pack([RPC.chunk,this._id,[chunk]])) | ||
this._worker._handle.send(mp.pack([this._id,RPC.chunk,[chunk]])) | ||
var req = new WriteReq(this,chunk) | ||
@@ -268,3 +268,3 @@ this._write_reqs.push(req) | ||
this._worker._handle.send(mp.pack([RPC.choke,this._id,[]])) | ||
this._worker._handle.send(mp.pack([this._id,RPC.choke,[]])) | ||
if(typeof cb === 'function'){ | ||
@@ -271,0 +271,0 @@ this.close = cb // the exact behavior of node::HandleWrap::Close |
@@ -17,3 +17,3 @@ | ||
var trace = 0 | ||
var trace = 1 | ||
@@ -51,2 +51,3 @@ var Worker = FSM.define({ | ||
s.owner = this | ||
s._setProtocol(RPC) | ||
return s | ||
@@ -95,2 +96,3 @@ }, | ||
handle.owner = this | ||
this._handle._setProtocol(RPC) | ||
}, | ||
@@ -124,3 +126,4 @@ | ||
__assert(this._state === 'connected', "this._state === 'connected'") | ||
this._handle.send(mp.pack([RPC.heartbeat, 0, []])) | ||
var sid = this.__sid++ | ||
this._handle.send(mp.pack([sid, RPC.heartbeat, []])) | ||
this._heartbeatTimer = setTimeout(this._handlers.sendNextHeartbeat, this._heartbeatInterval) | ||
@@ -165,3 +168,4 @@ } | ||
var _this = this.owner | ||
_this._handle.send(mp.pack([RPC.handshake, 0, [_this._uuid]])) | ||
var sid = _this.__sid++ | ||
_this._handle.send(mp.pack([sid, RPC.handshake, [_this._uuid]])) | ||
_this._setState('connected') | ||
@@ -188,5 +192,11 @@ _this._handlers.sendNextHeartbeat() | ||
// js app wants worker to shut down, for some good or bad reason | ||
terminate: function(normal, reason){ | ||
var state = normal? TERMINATE.normal: TERMINATE.abnormal | ||
var msg = mp.encode([RPC.terminate, 0, [state, reason]]) | ||
terminate: function(abnormal, reason){ | ||
var state = abnormal? TERMINATE.abnormal: TERMINATE.normal | ||
if(typeof reason !== 'string'){ | ||
reason = 'self-terminating' | ||
} | ||
var sid = this.__sid++ | ||
var msg = mp.pack([sid, RPC.terminate, [state, reason]]) | ||
console.log('sending terminate message') | ||
console.log(msg) | ||
this._handle.send(msg) | ||
@@ -288,2 +298,4 @@ this._setState('selfTerminated') | ||
terminate: function(){ | ||
var sid = this.__sid++ | ||
var msg = mp.encode([sid, RPC.terminate, [TERMINATE.normal, 'worker shut down']]) | ||
this._handle.send(msg) | ||
@@ -314,2 +326,3 @@ this._setState('terminated') | ||
_this._resetSessions(errno) | ||
var e = util.makeError(errno) | ||
_this.events.emit('error', e) | ||
@@ -337,2 +350,3 @@ } | ||
_this._resetSessions(errno) | ||
var e = util.makeError(errno) | ||
_this.events.emit('error', e) | ||
@@ -372,2 +386,3 @@ }, | ||
_this._resetSessions(errno) | ||
var e = util.makeError(errno) | ||
_this.events.emit('error', e) | ||
@@ -374,0 +389,0 @@ } |
{ | ||
"name": "cocaine", | ||
"version": "0.4.1", | ||
"version": "0.12.0-alpha3", | ||
"description": "Node.js framework for Cocaine platform", | ||
@@ -19,7 +19,11 @@ "author": "Cocaine Project <cocaine@yandex-team.ru>", | ||
"jampack": "0.0.7", | ||
"msgpack": "~0.1.7", | ||
"msgpack-bin": "0.2.x", | ||
"optimist": "~0.4", | ||
"q": "~0.9" | ||
}, | ||
"devDependencies": {} | ||
"devDependencies": { | ||
"mocha":"1.20.x", | ||
"msgpack-buf": "0.1.8", | ||
"node-uuid":"1.4.x" | ||
} | ||
} |
var mp = require('msgpack') | ||
var cli = new (require('../lib/client/client').Client)('10.11.12.13:10053') | ||
var Service = require('../lib/client/service').Service | ||
var app = cli.Service('diunko_did_node-js-sample', 'app') | ||
cli.on('error', function(err){ | ||
console.log('client error', err) | ||
}) | ||
var app = Service('test') | ||
app.on('error', function(err){ | ||
console.log('app error', err) | ||
}) | ||
app.connect() | ||
app.on('connect', function(){ | ||
var s = app.enqueue('http', mp.pack(['GET','/','HTTP/1.0',[['some-header','value']],''])) | ||
var rq = app.enqueue('http') | ||
s.on('data', function(data){ | ||
console.log('reply chunk',data) | ||
console.log(' which decodes',mp.unpack(data)) | ||
}) | ||
rq.send.write(mp.pack(['GET','/','HTTP/1.0',[['some-header','value']],''])) | ||
s.on('end', function(){ | ||
console.log('reply done') | ||
rq.recv({ | ||
write: function(data){ | ||
console.log('data', data) | ||
}, | ||
error: function(code, message){ | ||
console.log('error<%s,%s>', code, message) | ||
}, | ||
close: function(){ | ||
console.log('close') | ||
} | ||
}) | ||
s.on('error', function(err){ | ||
console.log('reply error', err) | ||
}) | ||
}) | ||
app.on('error', function(err){ | ||
console.log('client<test> error', err) | ||
}) | ||
var Logger = require('../lib/logger').Logger | ||
var Logger = require('../lib/client/logger').Logger | ||
@@ -15,3 +15,3 @@ var L = new Logger('testtest') | ||
setInterval(function(){ | ||
L.debug('testmessage ',i++) | ||
L.debug({count: i++}, 'testmessage ') | ||
},1000) | ||
@@ -22,3 +22,3 @@ } | ||
console.log('================') | ||
console.log(arguments) | ||
console.log('logger error', err) | ||
L.close() | ||
@@ -25,0 +25,0 @@ setTimeout(function(){ |
Sorry, the diff of this file is not supported yet
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
1710327
127
6484
3
2
3
9
6
+ Addedmsgpack-bin@0.2.x
+ Addedmsgpack-bin@0.2.6(transitive)
+ Addednan@1.9.0(transitive)
- Removedmsgpack@~0.1.7