Socket
Socket
Sign inDemoInstall

worker-farm

Package Overview
Dependencies
6
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.0.1 to 0.1.0

42

lib/child/index.js
var $module
var handle = function (data) {
var idx = data.idx
, child = data.child
, method = data.method
, args = data.args
, callback = function () {
process.send({
idx : idx
, child : child
, args : Array.prototype.slice.call(arguments)
})
}
, exec
function handle (data) {
var idx = data.idx
, child = data.child
, method = data.method
, args = data.args
, callback = function () {
process.send({
idx : idx
, child : child
, args : Array.prototype.slice.call(arguments)
})
}
, exec
if (method == null && typeof $module == 'function')
exec = $module
else if (typeof $module[method] == 'function')
exec = $module[method]
if (method == null && typeof $module == 'function')
exec = $module
else if (typeof $module[method] == 'function')
exec = $module[method]
if (!exec)
return console.error('NO SUCH METHOD:', method)
if (!exec)
return console.error('NO SUCH METHOD:', method)
exec.apply(null, args.concat([ callback ]))
}
exec.apply(null, args.concat([ callback ]))
}

@@ -28,0 +28,0 @@ process.on('message', function (data) {

@@ -1,196 +0,213 @@

const fork = require('./fork')
const DEFAULT_OPTIONS = {
maxCallsPerWorker : -1
, maxConcurrentWorkers : require('os').cpus().length
, maxConcurrentCallsPerWorker : 10
}
var Farm = {
// make a handle to pass back in the form of an external API
mkhandle : function (method) {
return function () {
var args = Array.prototype.slice.call(arguments)
this.addCall({
method : method
, callback : args.pop()
, args : args
})
}.bind(this)
}
const extend = require('xtend')
, fork = require('./fork')
// a constructor of sorts
, setup: function (methods) {
var iface
if (!methods) { // single-function export
iface = this.mkhandle()
} else { // multiple functions on the export
iface = {}
methods.forEach(function (m) {
iface[m] = this.mkhandle(m)
}.bind(this))
}
function Farm (options, path) {
this.options = extend(DEFAULT_OPTIONS, options)
this.path = path
}
this.searchStart = -1
this.childId = -1
this.children = {}
this.activeChildren = 0
this.callQueue = []
// make a handle to pass back in the form of an external API
Farm.prototype.mkhandle = function (method) {
return function () {
var args = Array.prototype.slice.call(arguments)
this.addCall({
method : method
, callback : args.pop()
, args : args
})
}.bind(this)
}
return iface
}
// a constructor of sorts
Farm.prototype.setup = function (methods) {
var iface
if (!methods) { // single-function export
iface = this.mkhandle()
} else { // multiple functions on the export
iface = {}
methods.forEach(function (m) {
iface[m] = this.mkhandle(m)
}.bind(this))
}
// when a child exits, check if there are any outstanding jobs and requeue them
, onExit: function (childId) {
// delay this to give any sends a chance to finish
setTimeout(function () {
var doQueue = false
if (this.children[childId] && this.children[childId].activeCalls) {
this.children[childId].calls.reverse().forEach(function (call) {
if (call) {
this.callQueue.unshift(call)
doQueue = true
}
}.bind(this))
}
this.stopChild(childId)
doQueue && this.processQueue()
}.bind(this), 10)
}
this.searchStart = -1
this.childId = -1
this.children = {}
this.activeChildren = 0
this.callQueue = []
// start a new worker
, startChild: function () {
this.children[++this.childId] = {
send : fork(this.path, this.receive.bind(this), this.onExit.bind(this, this.childId))
, calls : []
, activeCalls : 0
}
this.activeChildren++
}
return iface
}
// stop a worker, identified by id
, stopChild: function (child) {
if (this.children[child]) {
this.children[child].send('die')
;delete this.children[child]
this.activeChildren--
}
// when a child exits, check if there are any outstanding jobs and requeue them
Farm.prototype.onExit = function (childId) {
// delay this to give any sends a chance to finish
setTimeout(function () {
var doQueue = false
if (this.children[childId] && this.children[childId].activeCalls) {
this.children[childId].calls.reverse().forEach(function (call) {
if (call) {
this.callQueue.unshift(call)
doQueue = true
}
}.bind(this))
}
this.stopChild(childId)
doQueue && this.processQueue()
}.bind(this), 10)
}
// called from a child process, the data contains information needed to
// look up the child and the original call so we can invoke the callback
, receive: function (data) {
var idx = data.idx
, child = data.child
, args = data.args
// start a new worker
Farm.prototype.startChild = function () {
this.children[++this.childId] = {
send : fork(this.path, this.receive.bind(this), this.onExit.bind(this, this.childId))
, calls : []
, activeCalls : 0
}
this.activeChildren++
}
if (this.children[child]) {
if (this.children[child].calls[idx]) {
this.children[child].calls[idx].callback.apply(null, args)
;delete this.children[child].calls[idx]
this.children[child].activeCalls--
if (this.options.maxCallsPerWorker != -1
&& this.children[child].calls.length >= this.options.maxCallsPerWorker
&& !Object.keys(this.children[child].calls).length) {
// this child has finished its run, kill it
this.stopChild(child)
}
// allow any outstanding calls to be processed
this.processQueue()
} else
console.error(
'Worker Farm: Received message for unknown index for existing child. '
+ 'This should not happen!'
)
} else
console.error(
'Worker Farm: Received message for unknown child. '
+ 'This is likely as a result of premature child death, the operation will have been re-queued.'
)
}
// stop a worker, identified by id
Farm.prototype.stopChild = function (child) {
if (this.children[child]) {
this.children[child].send('die')
;delete this.children[child]
this.activeChildren--
}
}
// send a call to a worker, identified by id
, send: function (child, call) {
this.children[child].calls.push(call)
this.children[child].activeCalls++
this.children[child].send({
idx : this.children[child].calls.length - 1
, child : child
, method : call.method
, args : call.args
})
}
// called from a child process, the data contains information needed to
// look up the child and the original call so we can invoke the callback
Farm.prototype.receive = function (data) {
var idx = data.idx
, child = data.child
, args = data.args
// a list of active worker ids, in order, but the starting offset is
// shifted each time this method is called, so we work our way through
// all workers when handing out jobs
, childKeys: function () {
var cka = Object.keys(this.children)
, cks
if (this.searchStart >= cka.length - 1) this.searchStart = 0
else this.searchStart++
cks = cka.splice(0, this.searchStart)
return cka.concat(cks)
}
if (this.children[child]) {
if (this.children[child].calls[idx]) {
this.children[child].calls[idx].callback.apply(null, args)
;delete this.children[child].calls[idx]
this.children[child].activeCalls--
if (this.options.maxCallsPerWorker != -1
&& this.children[child].calls.length >= this.options.maxCallsPerWorker
&& !Object.keys(this.children[child].calls).length) {
// this child has finished its run, kill it
this.stopChild(child)
}
// allow any outstanding calls to be processed
this.processQueue()
} else
console.error(
'Worker Farm: Received message for unknown index for existing child. '
+ 'This should not happen!'
)
} else
console.error(
'Worker Farm: Received message for unknown child. '
+ 'This is likely as a result of premature child death, the operation will have been re-queued.'
)
}
// Calls are added to a queue, this processes the queue and is called
// whenever there might be a chance to send more calls to the workers.
// The various options all impact on when we're able to send calls,
// they may need to be kept in a queue until a worker is ready.
, processQueue: function () {
var cka, i = 0
// send a call to a worker, identified by id
Farm.prototype.send = function (child, call) {
this.children[child].calls.push(call)
this.children[child].activeCalls++
this.children[child].send({
idx : this.children[child].calls.length - 1
, child : child
, method : call.method
, args : call.args
})
}
if (!this.callQueue.length) {
if (this.ending) this.end()
return
}
// a list of active worker ids, in order, but the starting offset is
// shifted each time this method is called, so we work our way through
// all workers when handing out jobs
Farm.prototype.childKeys = function () {
var cka = Object.keys(this.children)
, cks
if (this.activeChildren < this.options.maxConcurrentWorkers)
this.startChild()
if (this.searchStart >= cka.length - 1)
this.searchStart = 0
else
this.searchStart++
for (cka = this.childKeys(); i < cka.length; i++) {
if ((this.options.maxConcurrentCallsPerWorker == -1
|| this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker)
&& (this.options.maxCallsPerWorker == -1
|| this.children[cka[i]].calls.length < this.options.maxCallsPerWorker)) {
cks = cka.splice(0, this.searchStart)
this.send(cka[i], this.callQueue.shift())
if (!this.callQueue.length) {
if (this.ending) this.end()
return
}
} /*else {
console.log(
this.options.maxConcurrentCallsPerWorker == -1
, this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker
, this.options.maxCallsPerWorker == -1
, this.children[cka[i]].calls.length < this.options.maxCallsPerWorker)
}*/
}
return cka.concat(cks)
}
if (this.ending) this.end()
}
// Calls are added to a queue, this processes the queue and is called
// whenever there might be a chance to send more calls to the workers.
// The various options all impact on when we're able to send calls,
// they may need to be kept in a queue until a worker is ready.
Farm.prototype.processQueue = function () {
var cka, i = 0
// add a new call to the call queue, then trigger a process of the queue
, addCall: function (call) {
if (this.ending) return this.end() // don't add anything new to the queue
this.callQueue.push(call)
this.processQueue()
}
if (!this.callQueue.length)
return this.ending && this.end()
// kills child workers when they're all done
, end: function (callback) {
var complete = true
if (this.ending === false) return
if (callback) this.ending = callback
else if (this.ending == null) this.ending = true
Object.keys(this.children).forEach(function (child) {
if (!this.children[child]) return
if (!this.children[child].activeCalls)
this.stopChild(child)
else
complete = false
}.bind(this))
if (this.activeChildren < this.options.maxConcurrentWorkers)
this.startChild()
if (complete && typeof this.ending == 'function') {
this.ending()
this.ending = false
}
}
}
for (cka = this.childKeys(); i < cka.length; i++) {
if ((this.options.maxConcurrentCallsPerWorker == -1
|| this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker)
&& (this.options.maxCallsPerWorker == -1
|| this.children[cka[i]].calls.length < this.options.maxCallsPerWorker)) {
this.send(cka[i], this.callQueue.shift())
if (!this.callQueue.length)
return this.ending && this.end()
} /*else {
console.log(
this.options.maxConcurrentCallsPerWorker == -1
, this.children[cka[i]].activeCalls < this.options.maxConcurrentCallsPerWorker
, this.options.maxCallsPerWorker == -1
, this.children[cka[i]].calls.length < this.options.maxCallsPerWorker)
}*/
}
if (this.ending)
this.end()
}
// add a new call to the call queue, then trigger a process of the queue
Farm.prototype.addCall = function (call) {
if (this.ending)
return this.end() // don't add anything new to the queue
this.callQueue.push(call)
this.processQueue()
}
// kills child workers when they're all done
Farm.prototype.end = function (callback) {
var complete = true
if (this.ending === false)
return
if (callback)
this.ending = callback
else if (this.ending == null)
this.ending = true
Object.keys(this.children).forEach(function (child) {
if (!this.children[child])
return
if (!this.children[child].activeCalls)
this.stopChild(child)
else
complete = false
}.bind(this))
if (complete && typeof this.ending == 'function') {
this.ending()
this.ending = false
}
}
module.exports = Farm
const childProcess = require('child_process')
, childModule = require.resolve('./child/index')
var fork = function (forkModule, receive, onExit) {
var child = childProcess.fork(childModule, {
env: process.env
, cwd: process.cwd()
})
function fork (forkModule, receive, onExit) {
var child = childProcess.fork(childModule, {
env: process.env
, cwd: process.cwd()
})
child.on('message', receive)
child.on('exit', onExit)
child.send({ module: forkModule })
// return a send() function for this child
return function (data) {
try {
child.send(data)
} catch (e) {
// this *should* be picked up by onExit and the operation requeued
}
}
child.on('message', receive)
child.on('exit', onExit)
child.send({ module: forkModule })
// return a send() function for this child
return function (data) {
try {
child.send(data)
} catch (e) {
// this *should* be picked up by onExit and the operation requeued
}
}
}
module.exports = fork

@@ -1,40 +0,29 @@

const DEFAULT_OPTIONS = {
maxCallsPerWorker : -1
, maxConcurrentWorkers : require('os').cpus().length
, maxConcurrentCallsPerWorker : 10
}
, extend = require('util')._extend
, Farm = require('./farm')
, farms = [] // keep record of farms so we can end() them if required
const Farm = require('./farm')
var farm = function (options, path, methods) {
var farm
var farms = [] // keep record of farms so we can end() them if required
if (typeof options == 'string') {
methods = path
path = options
options = {}
}
function farm (options, path, methods) {
if (typeof options == 'string') {
methods = path
path = options
options = {}
}
options = extend(Object.create(DEFAULT_OPTIONS), options)
var f = new Farm(options, path)
, api = f.setup(methods)
farm = Object.create(Farm, {
options : { writable: false, value: options }
, path : { writable: false, value: path }
})
farms.push({ farm: f, api: api })
farms.push({ farm: farm, api: farm.setup(methods) })
// return the public API
return api
}
// return the public API
return farms[farms.length - 1].api
}
function end (api, callback) {
for (var i = 0; i < farms.length; i++)
if (farms[i] && farms[i].api === api)
return farms[i].farm.end(callback)
process.nextTick(callback.bind(null, 'Worker farm not found!'))
}
, end = function (api, callback) {
for (var i = 0; i < farms.length; i++)
if (farms[i] && farms[i].api === api)
return farms[i].farm.end(callback)
process.nextTick(callback.bind(null, 'Worker farm not found!'))
}
module.exports = farm
module.exports.end = end
{
"name" : "worker-farm"
, "description" : "Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options."
, "version" : "0.0.1"
, "version" : "0.1.0"
, "homepage" : "https://github.com/rvagg/node-worker-farm"

@@ -16,2 +16,3 @@ , "authors" : [

, "dependencies" : {
"xtend" : "~2.0.6"
}

@@ -18,0 +19,0 @@ , "devDependencies" : {

# Worker Farm [![Build Status](https://secure.travis-ci.org/rvagg/node-worker-farm.png)](http://travis-ci.org/rvagg/node-worker-farm)
[![NPM](https://nodei.co/npm/worker-farm.png?downloads=true&stars=true)](https://nodei.co/npm/worker-farm/) [![NPM](https://nodei.co/npm-dl/worker-farm.png?months=6)](https://nodei.co/npm/worker-farm/)
Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options. *Available in npm as <strong>worker-farm</strong>*.

@@ -119,2 +122,4 @@

Any calls that are queued and not yet being handled by a child process will be discarded. `end()` only waits for those currently in progress.
Once you end a farm, it won't handle any more calls, so don't even try!

@@ -121,0 +126,0 @@

@@ -21,3 +21,3 @@ var tape = require('tape')

t.ok(pid > process.pid, 'pid makes sense')
t.ok(pid < process.pid + 100, 'pid makes sense')
t.ok(pid < process.pid + 250, 'pid makes sense')
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense')

@@ -36,3 +36,3 @@ })

t.ok(pid > process.pid, 'pid makes sense')
t.ok(pid < process.pid + 100, 'pid makes sense')
t.ok(pid < process.pid + 250, 'pid makes sense')
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense')

@@ -246,5 +246,5 @@ })

child(0, function (err, pid, rnd) {
t.ok(pid > process.pid, 'pid makes sense')
t.ok(pid < process.pid + 100, 'pid makes sense')
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense')
t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid)
t.ok(pid < process.pid + 250, 'pid makes sense ' + pid + ' vs ' + process.pid)
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd)
})

@@ -254,3 +254,3 @@

t.pass('an .end() callback was successfully called')
});
})
})

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc