worker-farm
Advanced tools
Comparing version 1.6.0 to 1.7.0
@@ -1,16 +0,14 @@ | ||
interface Workers { | ||
(callback: WorkerCallback): void; | ||
(arg1: any, callback: WorkerCallback): void; | ||
(arg1: any, arg2: any, callback: WorkerCallback): void; | ||
(arg1: any, arg2: any, arg3: any, callback: WorkerCallback): void; | ||
(arg1: any, arg2: any, arg3: any, arg4: any, callback: WorkerCallback): void; | ||
} | ||
import { ForkOptions } from "child_process"; | ||
type WorkerCallback = | ||
| WorkerCallback0 | ||
| WorkerCallback1 | ||
| WorkerCallback2 | ||
| WorkerCallback3 | ||
| WorkerCallback4; | ||
export = Farm; | ||
declare function Farm(name: string): Farm.Workers; | ||
declare function Farm(name: string, exportedMethods: string[]): Farm.Workers; | ||
declare function Farm(options: Farm.FarmOptions, name: string): Farm.Workers; | ||
declare function Farm( | ||
options: Farm.FarmOptions, | ||
name: string, | ||
exportedMethods: string[], | ||
): Farm.Workers; | ||
type WorkerCallback0 = () => void; | ||
@@ -22,24 +20,37 @@ type WorkerCallback1 = (arg1: any) => void; | ||
interface FarmOptions { | ||
maxCallsPerWorker?: number | ||
maxConcurrentWorkers?: number | ||
maxConcurrentCallsPerWorker?: number | ||
maxConcurrentCalls?: number | ||
maxCallTime?: number | ||
maxRetries?: number | ||
autoStart?: boolean | ||
} | ||
declare namespace Farm { | ||
export function end(workers: Workers, callback?: Function): void; | ||
interface WorkerFarm { | ||
(name: string): Workers; | ||
(name: string, exportedMethods: string[]): Workers; | ||
(options: FarmOptions, name: string): Workers; | ||
(options: FarmOptions, name: string, exportedMethods: string[]): Workers; | ||
export interface Workers { | ||
[x: string]: Workers, | ||
(callback: WorkerCallback): void; | ||
(arg1: any, callback: WorkerCallback): void; | ||
(arg1: any, arg2: any, callback: WorkerCallback): void; | ||
(arg1: any, arg2: any, arg3: any, callback: WorkerCallback): void; | ||
( | ||
arg1: any, | ||
arg2: any, | ||
arg3: any, | ||
arg4: any, | ||
callback: WorkerCallback, | ||
): void; | ||
} | ||
end: (workers: Workers) => void; | ||
} | ||
export interface FarmOptions { | ||
maxCallsPerWorker?: number; | ||
maxConcurrentWorkers?: number; | ||
maxConcurrentCallsPerWorker?: number; | ||
maxConcurrentCalls?: number; | ||
maxCallTime?: number; | ||
maxRetries?: number; | ||
autoStart?: boolean; | ||
workerOptions?: ForkOptions; | ||
} | ||
declare module "worker-farm" { | ||
const workerFarm: WorkerFarm; | ||
export = workerFarm; | ||
export type WorkerCallback = | ||
| WorkerCallback0 | ||
| WorkerCallback1 | ||
| WorkerCallback2 | ||
| WorkerCallback3 | ||
| WorkerCallback4; | ||
} |
@@ -32,3 +32,3 @@ 'use strict' | ||
} | ||
process.send({ idx: idx, child: child, args: _args }) | ||
process.send({ owner: 'farm', idx: idx, child: child, args: _args }) | ||
} | ||
@@ -50,5 +50,9 @@ , exec | ||
process.on('message', function (data) { | ||
if (data.owner !== 'farm') { | ||
return; | ||
} | ||
if (!$module) return $module = require(data.module) | ||
if (data == 'die') return process.exit(0) | ||
if (data.event == 'die') return process.exit(0) | ||
handle(data) | ||
}) |
@@ -13,2 +13,3 @@ 'use strict' | ||
, autoStart : false | ||
, onChild : function() {} | ||
} | ||
@@ -33,4 +34,4 @@ | ||
let args = Array.prototype.slice.call(arguments) | ||
if (this.activeCalls >= this.options.maxConcurrentCalls) { | ||
let err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')') | ||
if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) { | ||
let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')') | ||
if (typeof args[args.length - 1] == 'function') | ||
@@ -118,3 +119,10 @@ return process.nextTick(args[args.length - 1].bind(null, err)) | ||
forked.child.on('message', this.receive.bind(this)) | ||
this.options.onChild(forked.child); | ||
forked.child.on('message', function(data) { | ||
if (data.owner !== 'farm') { | ||
return; | ||
} | ||
this.receive(data); | ||
}.bind(this)) | ||
forked.child.once('exit', function (code) { | ||
@@ -134,3 +142,3 @@ c.exitCode = code | ||
if (child) { | ||
child.send('die') | ||
child.send({owner: 'farm', event: 'die'}) | ||
setTimeout(function () { | ||
@@ -241,3 +249,4 @@ if (child.exitCode === null) | ||
child.send({ | ||
idx : idx | ||
owner : 'farm' | ||
, idx : idx | ||
, child : childId | ||
@@ -244,0 +253,0 @@ , method : call.method |
@@ -23,3 +23,3 @@ 'use strict' | ||
child.send({ module: forkModule }) | ||
child.send({ owner: 'farm', module: forkModule }) | ||
@@ -26,0 +26,0 @@ // return a send() function for this child |
@@ -29,3 +29,3 @@ 'use strict' | ||
return farms[i].farm.end(callback) | ||
process.nextTick(callback.bind(null, 'Worker farm not found!')) | ||
process.nextTick(callback.bind(null, new Error('Worker farm not found!'))) | ||
} | ||
@@ -32,0 +32,0 @@ |
{ | ||
"name": "worker-farm", | ||
"description": "Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options.", | ||
"version": "1.6.0", | ||
"version": "1.7.0", | ||
"homepage": "https://github.com/rvagg/node-worker-farm", | ||
@@ -24,3 +24,3 @@ "authors": [ | ||
"devDependencies": { | ||
"tape": "~4.9.0" | ||
"tape": "~4.10.1" | ||
}, | ||
@@ -27,0 +27,0 @@ "scripts": { |
@@ -1,2 +0,2 @@ | ||
# Worker Farm [![Build Status](https://secure.travis-ci.org/rvagg/node-worker-farm.png)](http://travis-ci.org/rvagg/node-worker-farm) | ||
# Worker Farm [![Build Status](https://secure.travis-ci.org/rvagg/node-worker-farm.svg)](http://travis-ci.org/rvagg/node-worker-farm) | ||
@@ -114,2 +114,3 @@ [![NPM](https://nodei.co/npm/worker-farm.png?downloads=true&downloadRank=true&stars=true)](https://nodei.co/npm/worker-farm/) [![NPM](https://nodei.co/npm-dl/worker-farm.png?months=6&height=3)](https://nodei.co/npm/worker-farm/) | ||
, autoStart : false | ||
, onChild : function() {} | ||
} | ||
@@ -134,2 +135,4 @@ ``` | ||
* **<code>onChild</code>** when new child process starts this callback will be called with subprocess object as an argument. Use this when you need to add some custom communication with child processes. | ||
### workerFarm.end(farm) | ||
@@ -136,0 +139,0 @@ |
@@ -54,2 +54,18 @@ 'use strict' | ||
tape('on child', function (t) { | ||
t.plan(2) | ||
let child = workerFarm({ onChild: function(subprocess) { childPid = subprocess.pid } }, childPath) | ||
, childPid = null; | ||
child(0, function(err, pid) { | ||
t.equal(childPid, pid) | ||
}) | ||
workerFarm.end(child, function () { | ||
t.ok(true, 'workerFarm ended') | ||
}) | ||
}) | ||
// use the returned pids to check that we're using a single child process | ||
@@ -307,6 +323,8 @@ // when maxConcurrentWorkers = 1 | ||
let time = Date.now() - start | ||
// (defer * (count / callsPerWorker + 1)) - if precise it'd be count/callsPerWorker | ||
let min = defer * 1.5 | ||
// (defer * (count / callsPerWorker + 2)) - if precise it'd be count/callsPerWorker | ||
// but accounting for IPC and other overhead, we need to give it a bit of extra time, | ||
// hence the +1 | ||
t.ok(time > (defer * 1.5) && time < (defer * (count / callsPerWorker + 1)), 'processed tasks concurrently (' + time + 'ms)') | ||
// hence the +2 | ||
let max = defer * (count / callsPerWorker + 2) | ||
t.ok(time > min && time < max, 'processed tasks concurrently (' + time + ' > ' + min + ' && ' + time + ' < ' + max + ')') | ||
workerFarm.end(child, function () { | ||
@@ -479,2 +497,36 @@ t.ok(true, 'workerFarm ended') | ||
tape('test maxConcurrentCalls + queue', function (t) { | ||
t.plan(13) | ||
let child = workerFarm({ maxConcurrentCalls: 4, maxConcurrentWorkers: 2, maxConcurrentCallsPerWorker: 1 }, childPath) | ||
child(20, function (err) { console.log('ended short1'); t.notOk(err, 'no error, short call 1') }) | ||
child(20, function (err) { console.log('ended short2'); t.notOk(err, 'no error, short call 2') }) | ||
child(300, function (err) { t.notOk(err, 'no error, long call 1') }) | ||
child(300, function (err) { t.notOk(err, 'no error, long call 2') }) | ||
child(20, function (err) { | ||
t.ok(err, 'short call 3 should error') | ||
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') | ||
}) | ||
child(20, function (err) { | ||
t.ok(err, 'short call 4 should error') | ||
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') | ||
}) | ||
// cross fingers and hope the two short jobs have ended | ||
setTimeout(function () { | ||
child(20, function (err) { t.notOk(err, 'no error, delayed short call 1') }) | ||
child(20, function (err) { t.notOk(err, 'no error, delayed short call 2') }) | ||
child(20, function (err) { | ||
t.ok(err, 'delayed short call 3 should error') | ||
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') | ||
}) | ||
workerFarm.end(child, function () { | ||
t.ok(true, 'workerFarm ended') | ||
}) | ||
}, 250) | ||
}) | ||
// this test should not keep the process running! if the test process | ||
@@ -535,3 +587,3 @@ // doesn't die then the problem is here | ||
cwd : cwd | ||
, execArgv : ['--no-warnings'] | ||
, execArgv : ['--expose-gc'] | ||
} | ||
@@ -541,3 +593,3 @@ , child = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 5, workerOptions: workerOptions}, childPath, ['args']) | ||
child.args(function (err, result) { | ||
t.equal(result.execArgv[0], '--no-warnings', 'flags passed (overridden default)') | ||
t.equal(result.execArgv[0], '--expose-gc', 'flags passed (overridden default)') | ||
t.equal(result.cwd, cwd, 'correct cwd folder') | ||
@@ -544,0 +596,0 @@ }) |
Sorry, the diff of this file is not supported yet
49864
1079
151