worker-farm
Advanced tools
Comparing version 0.2.1 to 0.3.0
var $module | ||
/* | ||
var contextProto = this.context; | ||
while (contextProto = Object.getPrototypeOf(contextProto)) { | ||
completionGroups.push(Object.getOwnPropertyNames(contextProto)); | ||
} | ||
*/ | ||
function handle (data) { | ||
@@ -9,7 +16,12 @@ var idx = data.idx | ||
, callback = function () { | ||
process.send({ | ||
idx : idx | ||
, child : child | ||
, args : Array.prototype.slice.call(arguments) | ||
}) | ||
var _args = Array.prototype.slice.call(arguments) | ||
if (_args[0] instanceof Error) { | ||
_args[0] = { | ||
'$error' : '$error' | ||
, 'type' : _args[0].constructor.name | ||
, 'message' : _args[0].message | ||
, 'stack' : _args[0].stack | ||
} | ||
} | ||
process.send({ idx: idx, child: child, args: _args }) | ||
} | ||
@@ -16,0 +28,0 @@ , exec |
@@ -5,12 +5,16 @@ const DEFAULT_OPTIONS = { | ||
, maxConcurrentCallsPerWorker : 10 | ||
, maxConcurrentCalls : -1 | ||
, maxCallTime : 0 // exceed this and the whole worker is terminated | ||
, forcedKillTime : 100 | ||
} | ||
const extend = require('xtend') | ||
, TimeoutError = require('errno').create('TimeoutError') | ||
, fork = require('./fork') | ||
const extend = require('xtend') | ||
, fork = require('./fork') | ||
, TimeoutError = require('errno').create('TimeoutError') | ||
, MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError') | ||
function Farm (options, path) { | ||
this.options = extend(DEFAULT_OPTIONS, options) | ||
this.path = path | ||
this.options = extend(DEFAULT_OPTIONS, options) | ||
this.path = path | ||
this.activeCalls = 0 | ||
} | ||
@@ -22,2 +26,8 @@ | ||
var args = Array.prototype.slice.call(arguments) | ||
if (this.options.maxConcurrentCalls > 0 && this.activeCalls >= this.options.maxConcurrentCalls) { | ||
var err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')') | ||
if (typeof args[args.length - 1] == 'function') | ||
return process.nextTick(args[args.length - 1].bind(null, err)) | ||
throw err | ||
} | ||
this.addCall({ | ||
@@ -72,15 +82,34 @@ method : method | ||
Farm.prototype.startChild = function () { | ||
this.children[++this.childId] = { | ||
send : fork(this.path, this.receive.bind(this), this.onExit.bind(this, this.childId)) | ||
, calls : [] | ||
, activeCalls : 0 | ||
} | ||
this.childId++ | ||
var forked = fork(this.path) | ||
, id = this.childId | ||
, c = { | ||
send : forked.send | ||
, child : forked.child | ||
, calls : [] | ||
, activeCalls : 0 | ||
, exitCode : null | ||
} | ||
forked.child.on('message', this.receive.bind(this)) | ||
forked.child.once('exit', function (code) { | ||
c.exitCode = code | ||
this.onExit(id) | ||
}.bind(this)) | ||
this.activeChildren++ | ||
this.children[id] = c | ||
} | ||
// stop a worker, identified by id | ||
Farm.prototype.stopChild = function (child) { | ||
if (this.children[child]) { | ||
this.children[child].send('die') | ||
;delete this.children[child] | ||
Farm.prototype.stopChild = function (childId) { | ||
var child = this.children[childId] | ||
if (child) { | ||
child.send('die') | ||
setTimeout(function () { | ||
if (child.exitCode === null) | ||
child.child.kill('SIGKILL') | ||
}, this.options.forcedKillTime) | ||
;delete this.children[childId] | ||
this.activeChildren-- | ||
@@ -118,2 +147,17 @@ } | ||
if (args[0] && args[0].$error == '$error') { | ||
var e = args[0] | ||
switch (e.type) { | ||
case 'TypeError': args[0] = new TypeError(e.message); break | ||
case 'RangeError': args[0] = new RangeError(e.message); break | ||
case 'EvalError': args[0] = new EvalError(e.message); break | ||
case 'ReferenceError': args[0] = new ReferenceError(e.message); break | ||
case 'SyntaxError': args[0] = new SyntaxError(e.message); break | ||
case 'URIError': args[0] = new URIError(e.message); break | ||
default: args[0] = new Error(e.message) | ||
} | ||
args[0].type = e.type | ||
args[0].stack = e.stack | ||
} | ||
process.nextTick(function () { | ||
@@ -125,2 +169,3 @@ call.callback.apply(null, args) | ||
child.activeCalls-- | ||
this.activeCalls-- | ||
@@ -162,2 +207,3 @@ if (this.options.maxCallsPerWorker != -1 | ||
child.activeCalls++ | ||
this.activeCalls++ | ||
@@ -265,2 +311,3 @@ child.send({ | ||
module.exports = Farm | ||
module.exports = Farm | ||
module.exports.TimeoutError = TimeoutError |
const childProcess = require('child_process') | ||
, childModule = require.resolve('./child/index') | ||
function fork (forkModule, receive, onExit) { | ||
function fork (forkModule) { | ||
var child = childProcess.fork(childModule, { | ||
@@ -10,13 +10,14 @@ env: process.env | ||
child.on('message', receive) | ||
child.on('exit', onExit) | ||
child.send({ module: forkModule }) | ||
// return a send() function for this child | ||
return function (data) { | ||
try { | ||
child.send(data) | ||
} catch (e) { | ||
// this *should* be picked up by onExit and the operation requeued | ||
} | ||
return { | ||
send : function (data) { | ||
try { | ||
child.send(data) | ||
} catch (e) { | ||
// this *should* be picked up by onExit and the operation requeued | ||
} | ||
} | ||
, child : child | ||
} | ||
@@ -23,0 +24,0 @@ } |
{ | ||
"name" : "worker-farm" | ||
, "description" : "Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options." | ||
, "version" : "0.2.1" | ||
, "version" : "0.3.0" | ||
, "homepage" : "https://github.com/rvagg/node-worker-farm" | ||
@@ -6,0 +6,0 @@ , "authors" : [ |
@@ -109,2 +109,3 @@ # Worker Farm [![Build Status](https://secure.travis-ci.org/rvagg/node-worker-farm.png)](http://travis-ci.org/rvagg/node-worker-farm) | ||
, maxConcurrentCallsPerWorker : 10 | ||
, maxConcurrentCalls : -1 | ||
, maxCallTime : 0 | ||
@@ -120,2 +121,4 @@ } | ||
* **<code>maxConcurrentCalls</code>** allows you to control the maximum number of calls in the queue—either actively being processed or waiting for a worker to be processed. `-1` indicates no limit but if you have conditions that may endlessly queue jobs and you need to set a limit then provide a `>0` value and any calls that push the limit will return on their callback with a `MaxConcurrentCallsError` error (check `err.type == 'MaxConcurrentCallsError'`). | ||
* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `> 0`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `'maxConcurrentCallsperWorker'` value greater than `1` then **all calls currently executing** will fail and not be automatically resubmitted. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `'maxConcurrentCallsperWorker'` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker. | ||
@@ -122,0 +125,0 @@ |
@@ -16,2 +16,12 @@ module.exports = function (timeout, callback) { | ||
callback(null, id, process.pid) | ||
} | ||
module.exports.err = function (type, message, callback) { | ||
if (type == 'TypeError') | ||
return callback(new TypeError(message)) | ||
callback(new Error(message)) | ||
} | ||
module.exports.block = function () { | ||
while (true); | ||
} |
@@ -22,3 +22,3 @@ var tape = require('tape') | ||
t.ok(pid < process.pid + 500, 'pid makes sense') | ||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense') | ||
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense') | ||
}) | ||
@@ -37,7 +37,5 @@ | ||
child.run0(function (err, pid, rnd) { | ||
t.ok(pid > process.pid, 'pid makes sense') | ||
t.ok(pid < process.pid + 500, 'pid makes sense') | ||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense') | ||
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense') | ||
}) | ||
@@ -183,3 +181,3 @@ | ||
var time = Date.now() - start | ||
t.ok(time > 100 && time < 175, 'processed tasks concurrently (' + time + 'ms)') | ||
t.ok(time > 100 && time < 200, 'processed tasks concurrently (' + time + 'ms)') | ||
workerFarm.end(child, function () { | ||
@@ -278,3 +276,3 @@ t.ok(true, 'workerFarm ended') | ||
t.ok(pid < process.pid + 500, 'pid makes sense ' + pid + ' vs ' + process.pid) | ||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd) | ||
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense') | ||
}) | ||
@@ -345,1 +343,62 @@ | ||
}) | ||
tape('test error passing', function (t) { | ||
t.plan(7) | ||
var child = workerFarm(childPath, [ 'err' ]) | ||
child.err('Error', 'this is an Error', function (err) { | ||
t.ok(err instanceof Error, 'is an Error object') | ||
t.equal('Error', err.type, 'correct type') | ||
t.equal('this is an Error', err.message, 'correct message') | ||
}) | ||
child.err('TypeError', 'this is a TypeError', function (err) { | ||
t.ok(err instanceof Error, 'is a TypeError object') | ||
t.equal('TypeError', err.type, 'correct type') | ||
t.equal('this is a TypeError', err.message, 'correct message') | ||
}) | ||
workerFarm.end(child, function () { | ||
t.ok(true, 'workerFarm ended') | ||
}) | ||
}) | ||
tape('test maxConcurrentCalls', function (t) { | ||
t.plan(10) | ||
var child = workerFarm({ maxConcurrentCalls: 5 }, childPath) | ||
child(50, function (err) { t.notOk(err, 'no error') }) | ||
child(50, function (err) { t.notOk(err, 'no error') }) | ||
child(50, function (err) { t.notOk(err, 'no error') }) | ||
child(50, function (err) { t.notOk(err, 'no error') }) | ||
child(50, function (err) { t.notOk(err, 'no error') }) | ||
child(50, function (err) { | ||
t.ok(err) | ||
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') | ||
}) | ||
child(50, function (err) { | ||
t.ok(err) | ||
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') | ||
}) | ||
workerFarm.end(child, function () { | ||
t.ok(true, 'workerFarm ended') | ||
}) | ||
}) | ||
// this test should not keep the process running! if the test process | ||
// doesn't die then the problem is here | ||
tape('test timeout kill', function (t) { | ||
t.plan(3) | ||
var child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath, [ 'block' ]) | ||
child.block(function (err) { | ||
t.ok(err, 'got an error') | ||
t.equal(err.type, 'TimeoutError', 'correct error type') | ||
}) | ||
workerFarm.end(child, function () { | ||
t.ok(true, 'workerFarm ended') | ||
}) | ||
}) |
38647
759
136