worker-farm
Advanced tools
Comparing version 0.0.1 to 0.1.0
var $module | ||
var handle = function (data) { | ||
var idx = data.idx | ||
, child = data.child | ||
, method = data.method | ||
, args = data.args | ||
, callback = function () { | ||
process.send({ | ||
idx : idx | ||
, child : child | ||
, args : Array.prototype.slice.call(arguments) | ||
}) | ||
} | ||
, exec | ||
function handle (data) { | ||
var idx = data.idx | ||
, child = data.child | ||
, method = data.method | ||
, args = data.args | ||
, callback = function () { | ||
process.send({ | ||
idx : idx | ||
, child : child | ||
, args : Array.prototype.slice.call(arguments) | ||
}) | ||
} | ||
, exec | ||
if (method == null && typeof $module == 'function') | ||
exec = $module | ||
else if (typeof $module[method] == 'function') | ||
exec = $module[method] | ||
if (method == null && typeof $module == 'function') | ||
exec = $module | ||
else if (typeof $module[method] == 'function') | ||
exec = $module[method] | ||
if (!exec) | ||
return console.error('NO SUCH METHOD:', method) | ||
if (!exec) | ||
return console.error('NO SUCH METHOD:', method) | ||
exec.apply(null, args.concat([ callback ])) | ||
} | ||
exec.apply(null, args.concat([ callback ])) | ||
} | ||
@@ -28,0 +28,0 @@ process.on('message', function (data) { |
363
lib/farm.js
@@ -1,196 +0,213 @@ | ||
const fork = require('./fork') | ||
const DEFAULT_OPTIONS = { | ||
maxCallsPerWorker : -1 | ||
, maxConcurrentWorkers : require('os').cpus().length | ||
, maxConcurrentCallsPerWorker : 10 | ||
} | ||
var Farm = { | ||
// make a handle to pass back in the form of an external API | ||
mkhandle : function (method) { | ||
return function () { | ||
var args = Array.prototype.slice.call(arguments) | ||
this.addCall({ | ||
method : method | ||
, callback : args.pop() | ||
, args : args | ||
}) | ||
}.bind(this) | ||
} | ||
const extend = require('xtend') | ||
, fork = require('./fork') | ||
// a constructor of sorts | ||
, setup: function (methods) { | ||
var iface | ||
if (!methods) { // single-function export | ||
iface = this.mkhandle() | ||
} else { // multiple functions on the export | ||
iface = {} | ||
methods.forEach(function (m) { | ||
iface[m] = this.mkhandle(m) | ||
}.bind(this)) | ||
} | ||
function Farm (options, path) { | ||
this.options = extend(DEFAULT_OPTIONS, options) | ||
this.path = path | ||
} | ||
this.searchStart = -1 | ||
this.childId = -1 | ||
this.children = {} | ||
this.activeChildren = 0 | ||
this.callQueue = [] | ||
// make a handle to pass back in the form of an external API | ||
Farm.prototype.mkhandle = function (method) { | ||
return function () { | ||
var args = Array.prototype.slice.call(arguments) | ||
this.addCall({ | ||
method : method | ||
, callback : args.pop() | ||
, args : args | ||
}) | ||
}.bind(this) | ||
} | ||
return iface | ||
} | ||
// a constructor of sorts | ||
Farm.prototype.setup = function (methods) { | ||
var iface | ||
if (!methods) { // single-function export | ||
iface = this.mkhandle() | ||
} else { // multiple functions on the export | ||
iface = {} | ||
methods.forEach(function (m) { | ||
iface[m] = this.mkhandle(m) | ||
}.bind(this)) | ||
} | ||
// when a child exits, check if there are any outstanding jobs and requeue them | ||
, onExit: function (childId) { | ||
// delay this to give any sends a chance to finish | ||
setTimeout(function () { | ||
var doQueue = false | ||
if (this.children[childId] && this.children[childId].activeCalls) { | ||
this.children[childId].calls.reverse().forEach(function (call) { | ||
if (call) { | ||
this.callQueue.unshift(call) | ||
doQueue = true | ||
} | ||
}.bind(this)) | ||
} | ||
this.stopChild(childId) | ||
doQueue && this.processQueue() | ||
}.bind(this), 10) | ||
} | ||
this.searchStart = -1 | ||
this.childId = -1 | ||
this.children = {} | ||
this.activeChildren = 0 | ||
this.callQueue = [] | ||
// start a new worker | ||
, startChild: function () { | ||
this.children[++this.childId] = { | ||
send : fork(this.path, this.receive.bind(this), this.onExit.bind(this, this.childId)) | ||
, calls : [] | ||
, activeCalls : 0 | ||
} | ||
this.activeChildren++ | ||
} | ||
return iface | ||
} | ||
// stop a worker, identified by id | ||
, stopChild: function (child) { | ||
if (this.children[child]) { | ||
this.children[child].send('die') | ||
;delete this.children[child] | ||
this.activeChildren-- | ||
} | ||
// when a child exits, check if there are any outstanding jobs and requeue them | ||
Farm.prototype.onExit = function (childId) { | ||
// delay this to give any sends a chance to finish | ||
setTimeout(function () { | ||
var doQueue = false | ||
if (this.children[childId] && this.children[childId].activeCalls) { | ||
this.children[childId].calls.reverse().forEach(function (call) { | ||
if (call) { | ||
this.callQueue.unshift(call) | ||
doQueue = true | ||
} | ||
}.bind(this)) | ||
} | ||
this.stopChild(childId) | ||
doQueue && this.processQueue() | ||
}.bind(this), 10) | ||
} | ||
// called from a child process, the data contains information needed to | ||
// look up the child and the original call so we can invoke the callback | ||
, receive: function (data) { | ||
var idx = data.idx | ||
, child = data.child | ||
, args = data.args | ||
// start a new worker | ||
Farm.prototype.startChild = function () { | ||
this.children[++this.childId] = { | ||
send : fork(this.path, this.receive.bind(this), this.onExit.bind(this, this.childId)) | ||
, calls : [] | ||
, activeCalls : 0 | ||
} | ||
this.activeChildren++ | ||
} | ||
if (this.children[child]) { | ||
if (this.children[child].calls[idx]) { | ||
this.children[child].calls[idx].callback.apply(null, args) | ||
;delete this.children[child].calls[idx] | ||
this.children[child].activeCalls-- | ||
if (this.options.maxCallsPerWorker != -1 | ||
&& this.children[child].calls.length >= this.options.maxCallsPerWorker | ||
&& !Object.keys(this.children[child].calls).length) { | ||
// this child has finished its run, kill it | ||
this.stopChild(child) | ||
} | ||
// allow any outstanding calls to be processed | ||
this.processQueue() | ||
} else | ||
console.error( | ||
'Worker Farm: Received message for unknown index for existing child. ' | ||
+ 'This should not happen!' | ||
) | ||
} else | ||
console.error( | ||
'Worker Farm: Received message for unknown child. ' | ||
+ 'This is likely as a result of premature child death, the operation will have been re-queued.' | ||
) | ||
} | ||
// stop a worker, identified by id | ||
Farm.prototype.stopChild = function (child) { | ||
if (this.children[child]) { | ||
this.children[child].send('die') | ||
;delete this.children[child] | ||
this.activeChildren-- | ||
} | ||
} | ||
// send a call to a worker, identified by id | ||
, send: function (child, call) { | ||
this.children[child].calls.push(call) | ||
this.children[child].activeCalls++ | ||
this.children[child].send({ | ||
idx : this.children[child].calls.length - 1 | ||
, child : child | ||
, method : call.method | ||
, args : call.args | ||
}) | ||
} | ||
// called from a child process, the data contains information needed to | ||
// look up the child and the original call so we can invoke the callback | ||
Farm.prototype.receive = function (data) { | ||
var idx = data.idx | ||
, child = data.child | ||
, args = data.args | ||
// a list of active worker ids, in order, but the starting offset is | ||
// shifted each time this method is called, so we work our way through | ||
// all workers when handing out jobs | ||
, childKeys: function () { | ||
var cka = Object.keys(this.children) | ||
, cks | ||
if (this.searchStart >= cka.length - 1) this.searchStart = 0 | ||
else this.searchStart++ | ||
cks = cka.splice(0, this.searchStart) | ||
return cka.concat(cks) | ||
} | ||
if (this.children[child]) { | ||
if (this.children[child].calls[idx]) { | ||
this.children[child].calls[idx].callback.apply(null, args) | ||
;delete this.children[child].calls[idx] | ||
this.children[child].activeCalls-- | ||
if (this.options.maxCallsPerWorker != -1 | ||
&& this.children[child].calls.length >= this.options.maxCallsPerWorker | ||
&& !Object.keys(this.children[child].calls).length) { | ||
// this child has finished its run, kill it | ||
this.stopChild(child) | ||
} | ||
// allow any outstanding calls to be processed | ||
this.processQueue() | ||
} else | ||
console.error( | ||
'Worker Farm: Received message for unknown index for existing child. ' | ||
+ 'This should not happen!' | ||
) | ||
} else | ||
console.error( | ||
'Worker Farm: Received message for unknown child. ' | ||
+ 'This is likely as a result of premature child death, the operation will have been re-queued.' | ||
) | ||
} | ||
// Calls are added to a queue, this processes the queue and is called | ||
// whenever there might be a chance to send more calls to the workers. | ||
// The various options all impact on when we're able to send calls, | ||
// they may need to be kept in a queue until a worker is ready. | ||
, processQueue: function () { | ||
var cka, i = 0 | ||
// send a call to a worker, identified by id | ||
Farm.prototype.send = function (child, call) { | ||
this.children[child].calls.push(call) | ||
this.children[child].activeCalls++ | ||
this.children[child].send({ | ||
idx : this.children[child].calls.length - 1 | ||
, child : child | ||
, method : call.method | ||
, args : call.args | ||
}) | ||
} | ||
if (!this.callQueue.length) { | ||
if (this.ending) this.end() | ||
return | ||
} | ||
// a list of active worker ids, in order, but the starting offset is | ||
// shifted each time this method is called, so we work our way through | ||
// all workers when handing out jobs | ||
Farm.prototype.childKeys = function () { | ||
var cka = Object.keys(this.children) | ||
, cks | ||
if (this.activeChildren < this.options.maxConcurrentWorkers) | ||
this.startChild() | ||
if (this.searchStart >= cka.length - 1) | ||
this.searchStart = 0 | ||
else | ||
this.searchStart++ | ||
for (cka = this.childKeys(); i < cka.length; i++) { | ||
if ((this.options.maxConcurrentCallsPerWorker == -1 | ||
|| this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker) | ||
&& (this.options.maxCallsPerWorker == -1 | ||
|| this.children[cka[i]].calls.length < this.options.maxCallsPerWorker)) { | ||
cks = cka.splice(0, this.searchStart) | ||
this.send(cka[i], this.callQueue.shift()) | ||
if (!this.callQueue.length) { | ||
if (this.ending) this.end() | ||
return | ||
} | ||
} /*else { | ||
console.log( | ||
this.options.maxConcurrentCallsPerWorker == -1 | ||
, this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker | ||
, this.options.maxCallsPerWorker == -1 | ||
, this.children[cka[i]].calls.length < this.options.maxCallsPerWorker) | ||
}*/ | ||
} | ||
return cka.concat(cks) | ||
} | ||
if (this.ending) this.end() | ||
} | ||
// Calls are added to a queue, this processes the queue and is called | ||
// whenever there might be a chance to send more calls to the workers. | ||
// The various options all impact on when we're able to send calls, | ||
// they may need to be kept in a queue until a worker is ready. | ||
Farm.prototype.processQueue = function () { | ||
var cka, i = 0 | ||
// add a new call to the call queue, then trigger a process of the queue | ||
, addCall: function (call) { | ||
if (this.ending) return this.end() // don't add anything new to the queue | ||
this.callQueue.push(call) | ||
this.processQueue() | ||
} | ||
if (!this.callQueue.length) | ||
return this.ending && this.end() | ||
// kills child workers when they're all done | ||
, end: function (callback) { | ||
var complete = true | ||
if (this.ending === false) return | ||
if (callback) this.ending = callback | ||
else if (this.ending == null) this.ending = true | ||
Object.keys(this.children).forEach(function (child) { | ||
if (!this.children[child]) return | ||
if (!this.children[child].activeCalls) | ||
this.stopChild(child) | ||
else | ||
complete = false | ||
}.bind(this)) | ||
if (this.activeChildren < this.options.maxConcurrentWorkers) | ||
this.startChild() | ||
if (complete && typeof this.ending == 'function') { | ||
this.ending() | ||
this.ending = false | ||
} | ||
} | ||
} | ||
for (cka = this.childKeys(); i < cka.length; i++) { | ||
if ((this.options.maxConcurrentCallsPerWorker == -1 | ||
|| this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker) | ||
&& (this.options.maxCallsPerWorker == -1 | ||
|| this.children[cka[i]].calls.length < this.options.maxCallsPerWorker)) { | ||
this.send(cka[i], this.callQueue.shift()) | ||
if (!this.callQueue.length) | ||
return this.ending && this.end() | ||
} /*else { | ||
console.log( | ||
this.options.maxConcurrentCallsPerWorker == -1 | ||
, this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker | ||
, this.options.maxCallsPerWorker == -1 | ||
, this.children[cka[i]].calls.length < this.options.maxCallsPerWorker) | ||
}*/ | ||
} | ||
if (this.ending) | ||
this.end() | ||
} | ||
// add a new call to the call queue, then trigger a process of the queue | ||
Farm.prototype.addCall = function (call) { | ||
if (this.ending) | ||
return this.end() // don't add anything new to the queue | ||
this.callQueue.push(call) | ||
this.processQueue() | ||
} | ||
// kills child workers when they're all done | ||
Farm.prototype.end = function (callback) { | ||
var complete = true | ||
if (this.ending === false) | ||
return | ||
if (callback) | ||
this.ending = callback | ||
else if (this.ending == null) | ||
this.ending = true | ||
Object.keys(this.children).forEach(function (child) { | ||
if (!this.children[child]) | ||
return | ||
if (!this.children[child].activeCalls) | ||
this.stopChild(child) | ||
else | ||
complete = false | ||
}.bind(this)) | ||
if (complete && typeof this.ending == 'function') { | ||
this.ending() | ||
this.ending = false | ||
} | ||
} | ||
module.exports = Farm |
const childProcess = require('child_process') | ||
, childModule = require.resolve('./child/index') | ||
var fork = function (forkModule, receive, onExit) { | ||
var child = childProcess.fork(childModule, { | ||
env: process.env | ||
, cwd: process.cwd() | ||
}) | ||
function fork (forkModule, receive, onExit) { | ||
var child = childProcess.fork(childModule, { | ||
env: process.env | ||
, cwd: process.cwd() | ||
}) | ||
child.on('message', receive) | ||
child.on('exit', onExit) | ||
child.send({ module: forkModule }) | ||
// return a send() function for this child | ||
return function (data) { | ||
try { | ||
child.send(data) | ||
} catch (e) { | ||
// this *should* be picked up by onExit and the operation requeued | ||
} | ||
} | ||
child.on('message', receive) | ||
child.on('exit', onExit) | ||
child.send({ module: forkModule }) | ||
// return a send() function for this child | ||
return function (data) { | ||
try { | ||
child.send(data) | ||
} catch (e) { | ||
// this *should* be picked up by onExit and the operation requeued | ||
} | ||
} | ||
} | ||
module.exports = fork |
@@ -1,40 +0,29 @@ | ||
const DEFAULT_OPTIONS = { | ||
maxCallsPerWorker : -1 | ||
, maxConcurrentWorkers : require('os').cpus().length | ||
, maxConcurrentCallsPerWorker : 10 | ||
} | ||
, extend = require('util')._extend | ||
, Farm = require('./farm') | ||
, farms = [] // keep record of farms so we can end() them if required | ||
const Farm = require('./farm') | ||
var farm = function (options, path, methods) { | ||
var farm | ||
var farms = [] // keep record of farms so we can end() them if required | ||
if (typeof options == 'string') { | ||
methods = path | ||
path = options | ||
options = {} | ||
} | ||
function farm (options, path, methods) { | ||
if (typeof options == 'string') { | ||
methods = path | ||
path = options | ||
options = {} | ||
} | ||
options = extend(Object.create(DEFAULT_OPTIONS), options) | ||
var f = new Farm(options, path) | ||
, api = f.setup(methods) | ||
farm = Object.create(Farm, { | ||
options : { writable: false, value: options } | ||
, path : { writable: false, value: path } | ||
}) | ||
farms.push({ farm: f, api: api }) | ||
farms.push({ farm: farm, api: farm.setup(methods) }) | ||
// return the public API | ||
return api | ||
} | ||
// return the public API | ||
return farms[farms.length - 1].api | ||
} | ||
function end (api, callback) { | ||
for (var i = 0; i < farms.length; i++) | ||
if (farms[i] && farms[i].api === api) | ||
return farms[i].farm.end(callback) | ||
process.nextTick(callback.bind(null, 'Worker farm not found!')) | ||
} | ||
, end = function (api, callback) { | ||
for (var i = 0; i < farms.length; i++) | ||
if (farms[i] && farms[i].api === api) | ||
return farms[i].farm.end(callback) | ||
process.nextTick(callback.bind(null, 'Worker farm not found!')) | ||
} | ||
module.exports = farm | ||
module.exports.end = end |
{ | ||
"name" : "worker-farm" | ||
, "description" : "Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options." | ||
, "version" : "0.0.1" | ||
, "version" : "0.1.0" | ||
, "homepage" : "https://github.com/rvagg/node-worker-farm" | ||
@@ -16,2 +16,3 @@ , "authors" : [ | ||
, "dependencies" : { | ||
"xtend" : "~2.0.6" | ||
} | ||
@@ -18,0 +19,0 @@ , "devDependencies" : { |
# Worker Farm [![Build Status](https://secure.travis-ci.org/rvagg/node-worker-farm.png)](http://travis-ci.org/rvagg/node-worker-farm) | ||
[![NPM](https://nodei.co/npm/worker-farm.png?downloads=true&stars=true)](https://nodei.co/npm/worker-farm/) [![NPM](https://nodei.co/npm-dl/worker-farm.png?months=6)](https://nodei.co/npm/worker-farm/) | ||
Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options. *Available in npm as <strong>worker-farm</strong>*. | ||
@@ -119,2 +122,4 @@ | ||
Any calls that are queued and not yet being handled by a child process will be discarded. `end()` only waits for those currently in progress. | ||
Once you end a farm, it won't handle any more calls, so don't even try! | ||
@@ -121,0 +126,0 @@ |
@@ -21,3 +21,3 @@ var tape = require('tape') | ||
t.ok(pid > process.pid, 'pid makes sense') | ||
t.ok(pid < process.pid + 100, 'pid makes sense') | ||
t.ok(pid < process.pid + 250, 'pid makes sense') | ||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense') | ||
@@ -36,3 +36,3 @@ }) | ||
t.ok(pid > process.pid, 'pid makes sense') | ||
t.ok(pid < process.pid + 100, 'pid makes sense') | ||
t.ok(pid < process.pid + 250, 'pid makes sense') | ||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense') | ||
@@ -246,5 +246,5 @@ }) | ||
child(0, function (err, pid, rnd) { | ||
t.ok(pid > process.pid, 'pid makes sense') | ||
t.ok(pid < process.pid + 100, 'pid makes sense') | ||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense') | ||
t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid) | ||
t.ok(pid < process.pid + 250, 'pid makes sense ' + pid + ' vs ' + process.pid) | ||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd) | ||
}) | ||
@@ -254,3 +254,3 @@ | ||
t.pass('an .end() callback was successfully called') | ||
}); | ||
}) | ||
}) |
Sorry, the diff of this file is not supported yet
536
130
29629
1
+ Addedxtend@~2.0.6
+ Addedforeach@2.0.6(transitive)
+ Addedindexof@0.0.1(transitive)
+ Addedis@0.2.7(transitive)
+ Addedis-object@0.1.2(transitive)
+ Addedobject-keys@0.2.0(transitive)
+ Addedxtend@2.0.6(transitive)