Comparing version 1.0.0 to 1.1.0
@@ -177,8 +177,8 @@ "use strict"; | ||
var cancel = function () { | ||
packets.unacceptByJob('WORK_STATUS', jobid); | ||
packets.unacceptByJob('WORK_WARNING', jobid); | ||
packets.unacceptByJob('WORK_DATA', jobid); | ||
packets.unacceptByJob('WORK_FAIL', jobid); | ||
packets.unacceptByJob('WORK_EXCEPTION', jobid); | ||
packets.unacceptByJob('WORK_COMPLETE', jobid); | ||
packets.removeByJob('WORK_STATUS', jobid); | ||
packets.removeByJob('WORK_WARNING', jobid); | ||
packets.removeByJob('WORK_DATA', jobid); | ||
packets.removeByJob('WORK_FAIL', jobid); | ||
packets.removeByJob('WORK_EXCEPTION', jobid); | ||
packets.removeByJob('WORK_COMPLETE', jobid); | ||
}; | ||
@@ -185,0 +185,0 @@ packets.acceptByJob('WORK_STATUS', jobid, function (data) { |
@@ -12,3 +12,3 @@ { | ||
"author": "Rebecca Turner <me@re-becca.org> (http://re-becca.org)", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"dependencies": { | ||
@@ -15,0 +15,0 @@ "bluebird": "^2.1.3", |
"use strict"; | ||
var util = require('util'); | ||
var events = require('events'); | ||
var stream = require('stream'); | ||
var streamToBuffer = require('./stream-to-buffer'); | ||
@@ -8,35 +8,19 @@ var AbraxasError = require('./errors'); | ||
var PacketHandler = module.exports = function () { | ||
events.EventEmitter.call(this); | ||
this.defaultHandler = {}; | ||
this.eventQueue = {}; | ||
this.mapped = {}; | ||
var self = this; | ||
this.queueEventListener = function (data) { | ||
var event = data.type.name; | ||
self.eventQueue[event].shift().apply(null,arguments); | ||
self.handleEmptyEventQueue(event); | ||
} | ||
this.acceptByJobEventListener = function(data) { | ||
var event = data.type.name; | ||
var id = data.args.job; | ||
if (self.mapped[event][id]) { | ||
self.mapped[event][id].forEach(function(cb){ cb(data) }); | ||
} | ||
else { | ||
self.emit('unknown',data); | ||
} | ||
} | ||
this.serialHandler = {}; | ||
this.handler = {}; | ||
this.byJobHandler = {}; | ||
stream.Writable.call(this,{objectMode: true}) | ||
} | ||
util.inherits(PacketHandler, events.EventEmitter); | ||
util.inherits(PacketHandler, stream.Writable); | ||
PacketHandler.prototype.handleEmptyEventQueue = function(event) { | ||
if (this.eventQueue[event].length) return; | ||
if (this.defaultHandler[event]) { | ||
this.removeListener(event, this.queueEventListener); | ||
this.on(event, this.defaultHandler[event]); | ||
} | ||
else { | ||
this.removeListener(event, this.queueEventListener); | ||
} | ||
PacketHandler.prototype._write = function (packet, encoding, callback) { | ||
callback(); | ||
var name = packet.type.name; | ||
var job = packet.args.job; | ||
if (job && this.byJobHandler[job] && this.byJobHandler[job][name]) return this.byJobHandler[job][name](packet); | ||
if (this.handler[name]) return this.handler[name](packet); | ||
if (this.serialHandler[name] && this.serialHandler[name].length) return this.serialHandler[name].shift()(packet); | ||
if (this.defaultHandler[name]) return this.defaultHandler[name](packet); | ||
this.emit('unknown', packet); | ||
} | ||
@@ -47,30 +31,28 @@ | ||
this.defaultHandler[event] = callback; | ||
if (this.eventQueue[event] && this.eventQueue[event].length) return; | ||
this.on(event, this.defaultHandler[event]); | ||
} | ||
PacketHandler.prototype.removeDefault = function (event) { | ||
if (! this.defaultHandler[event]) throw new Error("Tried to remove a default handler for "+event+" but none was set"); | ||
this.removeListener(event, this.defaultHandler[event]); | ||
if (! this.defaultHandler[event]) throw new Error("Tried to remove a default handler for "+event+" packet but none was set"); | ||
delete this.defaultHandler[event]; | ||
} | ||
PacketHandler.prototype.accept = function (event, callback) { | ||
if (this.handler[event]) throw new Error("Tried to register a second default handler for the "+event+" packet"); | ||
this.handler[event] = callback; | ||
} | ||
PacketHandler.prototype.removeHandler = function (event) { | ||
if (! this.handler[event]) throw new Error("Tried to remove a default handler for "+event+" packet but none was set"); | ||
delete this.handler[event]; | ||
} | ||
PacketHandler.prototype.acceptSerial = function (event, callback) { | ||
if (!this.eventQueue[event]) this.eventQueue[event] = []; | ||
if (!this.eventQueue[event].length) { | ||
this.on(event, this.queueEventListener); | ||
if (this.defaultHandler[event]) this.removeListener(event, this.defaultHandler[event]); | ||
} | ||
this.eventQueue[event].push(callback); | ||
if (!this.serialHandler[event]) this.serialHandler[event] = []; | ||
this.serialHandler[event].push(callback); | ||
} | ||
PacketHandler.prototype.constructError = function (data,callback) { | ||
streamToBuffer(data.body,function(err,body) { | ||
if (err) { | ||
callback(new AbraxasError.Receive(err)); | ||
} | ||
else { | ||
callback(new AbraxasError.Server(data.args.errorcode,body)); | ||
} | ||
}); | ||
PacketHandler.prototype.removeSerial = function (event, callback) { | ||
if (!this.serialHandler[event]) return; | ||
if (!this.serialHandler[event].length) return; | ||
this.serialHandler[event] = this.serialHandler[event].filter(function(handler) { return callback!==handler }); | ||
} | ||
@@ -81,7 +63,7 @@ | ||
var success = function (data) { | ||
self.unacceptSerial('ERROR', failure); | ||
self.removeSerial('ERROR', failure); | ||
callback(null, data); | ||
} | ||
var failure = function (data) { | ||
self.unacceptSerial(event, success); | ||
self.removeSerial(event, success); | ||
self.constructError(data, callback); | ||
@@ -93,25 +75,12 @@ } | ||
PacketHandler.prototype.unacceptSerial = function (event, callback) { | ||
if (!this.eventQueue[event]) return; | ||
if (!this.eventQueue[event].length) return; | ||
this.eventQueue[event] = this.eventQueue[event].filter(function(handler) { return callback!==handler }); | ||
this.handleEmptyEventQueue(event); | ||
} | ||
PacketHandler.prototype.acceptByJob = function (event, id, callback) { | ||
if (!this.mapped[event]) this.mapped[event] = {}; | ||
if (!this.mapped[event][id]) { | ||
this.on(event, this.acceptByJobEventListener); | ||
this.mapped[event][id] = []; | ||
} | ||
this.mapped[event][id].push(callback); | ||
if (!this.byJobHandler[id]) this.byJobHandler[id] = {}; | ||
if (this.byJobHandler[id][event]) throw new Error("Tried to register job "+id+" handler for "+event+" packet"); | ||
this.byJobHandler[id][event] = callback; | ||
} | ||
PacketHandler.prototype.unacceptByJob = function (event, id, callback) { | ||
if (this.mapped[event]) { | ||
delete this.mapped[event][id]; | ||
} | ||
if (Object.keys(this.mapped[event]).length == 0) { | ||
this.removeListener(event, this.acceptByJobEventListener); | ||
} | ||
PacketHandler.prototype.removeByJob = function (event, id, callback) { | ||
if (!this.byJobHandler[id]) throw new Error("Tried to unregister "+event+" handler for job "+id+" but job doesn't exist"); | ||
delete this.byJobHandler[id][event]; | ||
if (!Object.keys(this.byJobHandler[id]).length) delete this.byJobHandler[id]; | ||
} | ||
@@ -122,8 +91,8 @@ | ||
var success = function(packet) { | ||
self.unacceptByJob(event, id); | ||
self.unacceptSerial('ERROR',failure); | ||
self.removeByJob(event, id); | ||
self.removeSerial('ERROR',failure); | ||
callback(null,packet); | ||
} | ||
var failure = function (packet) { | ||
self.unacceptByJob(event, id); | ||
self.removeByJob(event, id); | ||
self.constructError(packet, callback); | ||
@@ -134,1 +103,12 @@ } | ||
} | ||
PacketHandler.prototype.constructError = function (data,callback) { | ||
streamToBuffer(data.body,function(err,body) { | ||
if (err) { | ||
callback(new AbraxasError.Receive(err)); | ||
} | ||
else { | ||
callback(new AbraxasError.Server(data.args.errorcode,body)); | ||
} | ||
}); | ||
} |
@@ -21,3 +21,3 @@ "use strict"; | ||
var self = this; | ||
this.packets.on('OPTION_REQ', function (data) { | ||
this.packets.accept('OPTION_REQ', function (data) { | ||
if (self.feature[data.args.option] != null) { | ||
@@ -32,19 +32,19 @@ self.feature[data.args.option] = true; | ||
this.packets.on('ECHO_REQ', function (data) { | ||
this.packets.accept('ECHO_REQ', function (data) { | ||
self.socket.write({kind:'response',type:packet.types['ECHO_RES'],body:data.body}); | ||
}); | ||
this.packets.on('CAN_DO', function (data) { | ||
this.packets.accept('CAN_DO', function (data) { | ||
self.emit('add-worker',data.args.function,self,{timeout:0}); | ||
}); | ||
this.packets.on('CAN_DO_TIMEOUT', function (data) { | ||
this.packets.accept('CAN_DO_TIMEOUT', function (data) { | ||
self.emit('add-worker',data.args.function,self,{timeout:data.args.timeout}); | ||
}); | ||
this.packets.on('CANT_DO', function (data) { | ||
this.packets.accept('CANT_DO', function (data) { | ||
self.emit('remove-worker',data.args.function,self); | ||
}); | ||
this.packets.on('RESET_ABILITIES', function () { | ||
this.packets.accept('RESET_ABILITIES', function () { | ||
self.emit('remove-all-workers', self); | ||
@@ -57,3 +57,3 @@ }); | ||
this.packets.on('PRE_SLEEP', function (data) { | ||
this.packets.accept('PRE_SLEEP', function (data) { | ||
self.status = 'sleeping'; | ||
@@ -63,6 +63,6 @@ self.emit('sleeping'); | ||
this.packets.on('SET_CLIENT_ID', function (data) { | ||
this.packets.accept('SET_CLIENT_ID', function (data) { | ||
self.clientid = data.args.workerid; | ||
}); | ||
this.packets.on('SUBMIT_JOB', function (data) { | ||
this.packets.accept('SUBMIT_JOB', function (data) { | ||
self.emit('submit-job', { | ||
@@ -76,3 +76,3 @@ client: self, | ||
}); | ||
this.packets.on('SUBMIT_JOB_HIGH', function (data) { | ||
this.packets.accept('SUBMIT_JOB_HIGH', function (data) { | ||
self.emit('submit-job', { | ||
@@ -86,3 +86,3 @@ client: self, | ||
}); | ||
this.packets.on('SUBMIT_JOB_LOW', function (data) { | ||
this.packets.accept('SUBMIT_JOB_LOW', function (data) { | ||
self.emit('submit-job', { | ||
@@ -96,3 +96,3 @@ client: self, | ||
}); | ||
this.packets.on('SUBMIT_JOB_BG', function (data) { | ||
this.packets.accept('SUBMIT_JOB_BG', function (data) { | ||
self.emit('submit-job', { | ||
@@ -107,3 +107,3 @@ client: self, | ||
}); | ||
this.packets.on('SUBMIT_JOB_HIGH_BG', function (data) { | ||
this.packets.accept('SUBMIT_JOB_HIGH_BG', function (data) { | ||
self.emit('submit-job', { | ||
@@ -118,3 +118,3 @@ client: self, | ||
}); | ||
this.packets.on('SUBMIT_JOB_LOW_BG', function (data) { | ||
this.packets.accept('SUBMIT_JOB_LOW_BG', function (data) { | ||
self.emit('submit-job', { | ||
@@ -129,29 +129,29 @@ client: self, | ||
}); | ||
this.packets.on('GET_STATUS', function (data) { | ||
this.packets.accept('GET_STATUS', function (data) { | ||
self.emit('get-status',data.args.job,client); | ||
}); | ||
this.packets.on('GRAB_JOB', function (data) { | ||
this.packets.accept('GRAB_JOB', function (data) { | ||
self.status='active'; | ||
self.emit('grab-job',self,false); | ||
}); | ||
this.packets.on('GRAB_JOB_UNIQ', function (data) { | ||
this.packets.accept('GRAB_JOB_UNIQ', function (data) { | ||
self.status='active'; | ||
self.emit('grab-job',self,true); | ||
}); | ||
this.packets.on('WORK_COMPLETE', function (data) { | ||
this.packets.accept('WORK_COMPLETE', function (data) { | ||
self.server.workComplete(self,data.args.job,data.body); | ||
}); | ||
this.packets.on('WORK_DATA', function (data) { | ||
this.packets.accept('WORK_DATA', function (data) { | ||
self.server.workData(self,data.args.job,data.body); | ||
}); | ||
this.packets.on('WORK_STATUS', function (data) { | ||
this.packets.accept('WORK_STATUS', function (data) { | ||
self.server.workStatus(self,data.args.job,data.args.complete,data.args.total); | ||
}); | ||
this.packets.on('WORK_FAIL', function (data) { | ||
this.packets.accept('WORK_FAIL', function (data) { | ||
self.server.workFail(self,data.args.job); | ||
}); | ||
this.packets.on('WORK_EXCEPTION', function (data) { | ||
this.packets.accept('WORK_EXCEPTION', function (data) { | ||
self.server.workException(self,data.args.job,data.body); | ||
}); | ||
this.packets.on('WORK_WARNING', function (data) { | ||
this.packets.accept('WORK_WARNING', function (data) { | ||
self.server.workWarning(self,data.args.job,data.body); | ||
@@ -158,0 +158,0 @@ }); |
@@ -94,6 +94,4 @@ "use strict"; | ||
input.on('data',function(data){ | ||
if (events.EventEmitter.listenerCount(self.packets,data.type.name)) return self.packets.emit(data.type.name, data); | ||
self.emit('unknown-packet', data.type.name, data); | ||
}); | ||
input.pipe(packets); | ||
packets.on('unknown', function (packet) { self.emitUnknownPacket(packet) }); | ||
@@ -115,3 +113,3 @@ events.EventEmitter.call(this); | ||
AbraxasSocket.prototype.destroy = function () { | ||
if (this.connection) this.connection.destroy(); | ||
if (this.connection instanceof net.Socket) this.connection.destroy(); | ||
this.disconnect(); | ||
@@ -118,0 +116,0 @@ } |
@@ -24,3 +24,3 @@ "use strict"; | ||
var self = this; | ||
this.packets.acceptDefault('NO_JOB', function(data) { | ||
this.packets.accept('NO_JOB', function(data) { | ||
if (!self.socket) return; | ||
@@ -30,3 +30,3 @@ self.socket.write({kind:'request',type:packet.types['PRE_SLEEP']}); | ||
this.packets.acceptDefault('NOOP', function(data) { self.askForWork() }); | ||
this.packets.accept('NOOP', function(data) { self.askForWork() }); | ||
} | ||
@@ -73,3 +73,3 @@ var Worker = exports.Worker = {}; | ||
if (-- this._workersCount == 0) { | ||
this.packets.removeListener('JOB_ASSIGN_UNIQ', this.onJobAssign); | ||
this.packets.removeHandler('JOB_ASSIGN_UNIQ', this.onJobAssign); | ||
this.unref(); | ||
@@ -90,3 +90,3 @@ } | ||
this.ref(); | ||
this.packets.on('JOB_ASSIGN_UNIQ', this.onJobAssign = function(job) { self.dispatchWorker(job) }); | ||
this.packets.accept('JOB_ASSIGN_UNIQ', this.onJobAssign = function(job) { self.dispatchWorker(job) }); | ||
} | ||
@@ -127,3 +127,3 @@ } | ||
this._workersCount = 0; | ||
this.packets.removeListener('JOB_ASSIGN_UNIQ', this.onJobAssign); | ||
this.packets.removeHandler('JOB_ASSIGN_UNIQ', this.onJobAssign); | ||
this.unref(); | ||
@@ -130,0 +130,0 @@ if (!this.connected) return; |
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
0
107231
39
2176