Socket
Socket
Sign inDemoInstall

worker-farm

Package Overview
Dependencies
8
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.2.1 to 0.3.0

22

lib/child/index.js
var $module
/*
var contextProto = this.context;
while (contextProto = Object.getPrototypeOf(contextProto)) {
completionGroups.push(Object.getOwnPropertyNames(contextProto));
}
*/
function handle (data) {

@@ -9,7 +16,12 @@ var idx = data.idx

, callback = function () {
process.send({
idx : idx
, child : child
, args : Array.prototype.slice.call(arguments)
})
var _args = Array.prototype.slice.call(arguments)
if (_args[0] instanceof Error) {
_args[0] = {
'$error' : '$error'
, 'type' : _args[0].constructor.name
, 'message' : _args[0].message
, 'stack' : _args[0].stack
}
}
process.send({ idx: idx, child: child, args: _args })
}

@@ -16,0 +28,0 @@ , exec

@@ -5,12 +5,16 @@ const DEFAULT_OPTIONS = {

, maxConcurrentCallsPerWorker : 10
, maxConcurrentCalls : -1
, maxCallTime : 0 // exceed this and the whole worker is terminated
, forcedKillTime : 100
}
const extend = require('xtend')
, TimeoutError = require('errno').create('TimeoutError')
, fork = require('./fork')
const extend = require('xtend')
, fork = require('./fork')
, TimeoutError = require('errno').create('TimeoutError')
, MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
function Farm (options, path) {
this.options = extend(DEFAULT_OPTIONS, options)
this.path = path
this.options = extend(DEFAULT_OPTIONS, options)
this.path = path
this.activeCalls = 0
}

@@ -22,2 +26,8 @@

var args = Array.prototype.slice.call(arguments)
if (this.options.maxConcurrentCalls > 0 && this.activeCalls >= this.options.maxConcurrentCalls) {
var err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')')
if (typeof args[args.length - 1] == 'function')
return process.nextTick(args[args.length - 1].bind(null, err))
throw err
}
this.addCall({

@@ -72,15 +82,34 @@ method : method

Farm.prototype.startChild = function () {
this.children[++this.childId] = {
send : fork(this.path, this.receive.bind(this), this.onExit.bind(this, this.childId))
, calls : []
, activeCalls : 0
}
this.childId++
var forked = fork(this.path)
, id = this.childId
, c = {
send : forked.send
, child : forked.child
, calls : []
, activeCalls : 0
, exitCode : null
}
forked.child.on('message', this.receive.bind(this))
forked.child.once('exit', function (code) {
c.exitCode = code
this.onExit(id)
}.bind(this))
this.activeChildren++
this.children[id] = c
}
// stop a worker, identified by id
Farm.prototype.stopChild = function (child) {
if (this.children[child]) {
this.children[child].send('die')
;delete this.children[child]
Farm.prototype.stopChild = function (childId) {
var child = this.children[childId]
if (child) {
child.send('die')
setTimeout(function () {
if (child.exitCode === null)
child.child.kill('SIGKILL')
}, this.options.forcedKillTime)
;delete this.children[childId]
this.activeChildren--

@@ -118,2 +147,17 @@ }

if (args[0] && args[0].$error == '$error') {
var e = args[0]
switch (e.type) {
case 'TypeError': args[0] = new TypeError(e.message); break
case 'RangeError': args[0] = new RangeError(e.message); break
case 'EvalError': args[0] = new EvalError(e.message); break
case 'ReferenceError': args[0] = new ReferenceError(e.message); break
case 'SyntaxError': args[0] = new SyntaxError(e.message); break
case 'URIError': args[0] = new URIError(e.message); break
default: args[0] = new Error(e.message)
}
args[0].type = e.type
args[0].stack = e.stack
}
process.nextTick(function () {

@@ -125,2 +169,3 @@ call.callback.apply(null, args)

child.activeCalls--
this.activeCalls--

@@ -162,2 +207,3 @@ if (this.options.maxCallsPerWorker != -1

child.activeCalls++
this.activeCalls++

@@ -265,2 +311,3 @@ child.send({

module.exports = Farm
module.exports = Farm
module.exports.TimeoutError = TimeoutError
const childProcess = require('child_process')
, childModule = require.resolve('./child/index')
function fork (forkModule, receive, onExit) {
function fork (forkModule) {
var child = childProcess.fork(childModule, {

@@ -10,13 +10,14 @@ env: process.env

child.on('message', receive)
child.on('exit', onExit)
child.send({ module: forkModule })
// return a send() function for this child
return function (data) {
try {
child.send(data)
} catch (e) {
// this *should* be picked up by onExit and the operation requeued
}
return {
send : function (data) {
try {
child.send(data)
} catch (e) {
// this *should* be picked up by onExit and the operation requeued
}
}
, child : child
}

@@ -23,0 +24,0 @@ }

{
"name" : "worker-farm"
, "description" : "Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options."
, "version" : "0.2.1"
, "version" : "0.3.0"
, "homepage" : "https://github.com/rvagg/node-worker-farm"

@@ -6,0 +6,0 @@ , "authors" : [

@@ -109,2 +109,3 @@ # Worker Farm [![Build Status](https://secure.travis-ci.org/rvagg/node-worker-farm.png)](http://travis-ci.org/rvagg/node-worker-farm)

, maxConcurrentCallsPerWorker : 10
, maxConcurrentCalls : -1
, maxCallTime : 0

@@ -120,2 +121,4 @@ }

* **<code>maxConcurrentCalls</code>** allows you to control the maximum number of calls in the queue&mdash;either actively being processed or waiting for a worker to be processed. `-1` indicates no limit but if you have conditions that may endlessly queue jobs and you need to set a limit then provide a `>0` value and any calls that push the limit will return on their callback with a `MaxConcurrentCallsError` error (check `err.type == 'MaxConcurrentCallsError'`).
* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `> 0`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `'maxConcurrentCallsperWorker'` value greater than `1` then **all calls currently executing** will fail and not be automatically resubmitted. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `'maxConcurrentCallsperWorker'` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker.

@@ -122,0 +125,0 @@

@@ -16,2 +16,12 @@ module.exports = function (timeout, callback) {

callback(null, id, process.pid)
}
module.exports.err = function (type, message, callback) {
if (type == 'TypeError')
return callback(new TypeError(message))
callback(new Error(message))
}
module.exports.block = function () {
while (true);
}

@@ -22,3 +22,3 @@ var tape = require('tape')

t.ok(pid < process.pid + 500, 'pid makes sense')
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense')
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
})

@@ -37,7 +37,5 @@

child.run0(function (err, pid, rnd) {
t.ok(pid > process.pid, 'pid makes sense')
t.ok(pid < process.pid + 500, 'pid makes sense')
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense')
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
})

@@ -183,3 +181,3 @@

var time = Date.now() - start
t.ok(time > 100 && time < 175, 'processed tasks concurrently (' + time + 'ms)')
t.ok(time > 100 && time < 200, 'processed tasks concurrently (' + time + 'ms)')
workerFarm.end(child, function () {

@@ -278,3 +276,3 @@ t.ok(true, 'workerFarm ended')

t.ok(pid < process.pid + 500, 'pid makes sense ' + pid + ' vs ' + process.pid)
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense')
})

@@ -345,1 +343,62 @@

})
tape('test error passing', function (t) {
t.plan(7)
var child = workerFarm(childPath, [ 'err' ])
child.err('Error', 'this is an Error', function (err) {
t.ok(err instanceof Error, 'is an Error object')
t.equal('Error', err.type, 'correct type')
t.equal('this is an Error', err.message, 'correct message')
})
child.err('TypeError', 'this is a TypeError', function (err) {
t.ok(err instanceof Error, 'is a TypeError object')
t.equal('TypeError', err.type, 'correct type')
t.equal('this is a TypeError', err.message, 'correct message')
})
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
})
tape('test maxConcurrentCalls', function (t) {
t.plan(10)
var child = workerFarm({ maxConcurrentCalls: 5 }, childPath)
child(50, function (err) { t.notOk(err, 'no error') })
child(50, function (err) { t.notOk(err, 'no error') })
child(50, function (err) { t.notOk(err, 'no error') })
child(50, function (err) { t.notOk(err, 'no error') })
child(50, function (err) { t.notOk(err, 'no error') })
child(50, function (err) {
t.ok(err)
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
})
child(50, function (err) {
t.ok(err)
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type')
})
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
})
// this test should not keep the process running! if the test process
// doesn't die then the problem is here
tape('test timeout kill', function (t) {
t.plan(3)
var child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath, [ 'block' ])
child.block(function (err) {
t.ok(err, 'got an error')
t.equal(err.type, 'TimeoutError', 'correct error type')
})
workerFarm.end(child, function () {
t.ok(true, 'workerFarm ended')
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc