Comparing version 3.0.0-alpha.2 to 3.0.0-alpha.3
@@ -10,2 +10,4 @@ const cluster = require('cluster'), | ||
const pEvent = require('p-event'); | ||
/** | ||
@@ -63,5 +65,3 @@ * @param {Object} context | ||
* */ | ||
this._initPromise = new Promise(resolve => { | ||
this.once('initialized', resolve); | ||
}); | ||
this._initPromise = pEvent(this, 'initialized'); | ||
@@ -115,2 +115,11 @@ /** | ||
configure(config, applyEnv, basedir) { | ||
this._setConfig(config, applyEnv, basedir); | ||
if (this.config) { | ||
this.emit('configured'); | ||
} | ||
return this; | ||
} | ||
_setConfig(config, applyEnv, basedir) { | ||
if (typeof applyEnv === 'undefined' || applyEnv) { | ||
@@ -134,7 +143,3 @@ Configuration.applyEnvironment(config); | ||
this.setMaxListeners(this.config.get('maxEventListeners', 100)); | ||
this.emit('configured'); | ||
} | ||
return this; | ||
} | ||
@@ -141,0 +146,0 @@ |
@@ -23,2 +23,6 @@ const path = require('path'), | ||
get raw() { | ||
return this._rawConfig; | ||
} | ||
/** | ||
@@ -97,2 +101,12 @@ * @param {String} path | ||
extend(config) { | ||
return new Configuration( | ||
{ | ||
...this.raw, | ||
...config, | ||
}, | ||
this._resolveBaseDir, | ||
); | ||
} | ||
/** | ||
@@ -99,0 +113,0 @@ * Override config properties using `LUSTER_CONF` environment variable. |
@@ -97,2 +97,24 @@ const Terror = require('terror'), | ||
/** | ||
* @constructor | ||
* @class LusterMasterError | ||
* @augments LusterError | ||
*/ | ||
errors.LusterMasterError = LusterError.create('LusterMasterError', | ||
{ | ||
POOL_KEY_ALREADY_TAKEN: | ||
'Pool key "%key%" is already taken' | ||
}); | ||
/** | ||
* @constructor | ||
* @class LusterMasterError | ||
* @augments LusterError | ||
*/ | ||
errors.LusterMasterError = LusterError.create('LusterMasterError', | ||
{ | ||
POOL_DOES_NOT_EXIST: | ||
'Pool with key "%key%" does not exist' | ||
}); | ||
module.exports = errors; |
@@ -20,4 +20,6 @@ const util = require('util'), | ||
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', this.wid, inspectedArgs); | ||
const key = this.eexKey; | ||
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', key, inspectedArgs); | ||
return EventEmitter.prototype.emit.apply(this, args); | ||
@@ -24,0 +26,0 @@ }; |
@@ -1,9 +0,11 @@ | ||
const os = require('os'), | ||
cluster = require('cluster'), | ||
ClusterProcess = require('./cluster_process'), | ||
WorkerWrapper = require('./worker_wrapper'), | ||
Port = require('./port'), | ||
RestartQueue = require('./restart_queue'), | ||
RPC = require('./rpc'); | ||
const cluster = require('cluster'); | ||
const ClusterProcess = require('./cluster_process'); | ||
const LusterMasterError = require('./errors').LusterMasterError; | ||
const RPC = require('./rpc'); | ||
const WorkerPool = require('./worker-pool'); | ||
const WorkerWrapper = require('./worker_wrapper'); | ||
const DEFAULT_POOL_KEY = '__default'; | ||
/** | ||
@@ -19,17 +21,2 @@ * @constructor | ||
/** | ||
* @type {Object} | ||
* @property {WorkerWrapper} * | ||
* @public | ||
* @todo make it private or public immutable | ||
*/ | ||
this.workers = {}; | ||
/** | ||
* Workers restart queue. | ||
* @type {RestartQueue} | ||
* @private | ||
*/ | ||
this._restartQueue = new RestartQueue(); | ||
/** | ||
* Configuration object to pass to cluster.setupMaster() | ||
@@ -41,9 +28,9 @@ * @type {Object} | ||
this.pools = new Map(); | ||
this.id = 0; | ||
this.wid = 0; | ||
this.eexKey = 0; | ||
this.pid = process.pid; | ||
this.on('worker state', this._cleanupUnixSockets.bind(this)); | ||
this.on('worker exit', this._checkWorkersAlive.bind(this)); | ||
// @todo make it optional? | ||
@@ -55,2 +42,48 @@ process.on('SIGINT', this._onSignalQuit.bind(this)); | ||
/** | ||
* @param {*} key | ||
* @returns {WorkerPool} | ||
*/ | ||
createPool(key) { | ||
if (this.pools.has(key)) { | ||
throw LusterMasterError.createError( | ||
LusterMasterError.CODES.POOL_KEY_ALREADY_TAKEN, | ||
{key} | ||
); | ||
} | ||
this.emit('create pool', key); | ||
const pool = new WorkerPool(key, this); | ||
this._proxyWorkerEvents(pool); | ||
pool.on('shutdown', this._checkPoolsAlive.bind(this)); | ||
this.pools.set(key, pool); | ||
return pool; | ||
} | ||
/** | ||
* @param {*} key | ||
* @returns {WorkerPool} | ||
*/ | ||
removePool(key) { | ||
const pool = this.pools.get(key); | ||
if (!pool) { | ||
throw LusterMasterError.createError( | ||
LusterMasterError.CODES.POOL_DOES_NOT_EXIST, | ||
{key} | ||
); | ||
} | ||
this.emit('remove pool', key); | ||
this.pools.delete(key); | ||
return pool; | ||
} | ||
getPool(key) { | ||
return this.pools.get(key); | ||
} | ||
getDefaultPool() { | ||
return this.getPool(DEFAULT_POOL_KEY); | ||
} | ||
/** | ||
* Allows same object structure as cluster.setupMaster(). | ||
@@ -78,43 +111,10 @@ * This function must be used instead of cluster.setupMaster(), | ||
/** | ||
* Remove not used unix socket before worker will try to listen it. | ||
* @param {WorkerWrapper} worker | ||
* @param {WorkerWrapperState} state | ||
* @private | ||
*/ | ||
_cleanupUnixSockets(worker, state) { | ||
const port = worker.options.port; | ||
if (this._restartQueue.has(worker) || | ||
state !== WorkerWrapper.STATES.LAUNCHING || | ||
port.family !== Port.UNIX) { | ||
return; | ||
} | ||
const inUse = this.getWorkersArray().some(w => | ||
worker.wid !== w.wid && | ||
w.isRunning() && | ||
port.isEqualTo(w.options.port) | ||
); | ||
if (!inUse) { | ||
port.unlink(err => { | ||
if (err) { | ||
this.emit('error', err); | ||
} | ||
}); | ||
} | ||
} | ||
/** | ||
* Check for alive workers, if no one here, then emit "shutdown". | ||
* @private | ||
*/ | ||
_checkWorkersAlive() { | ||
const workers = this.getWorkersArray(), | ||
alive = workers.reduce( | ||
(count, w) => w.dead ? count - 1 : count, | ||
workers.length | ||
); | ||
_checkPoolsAlive() { | ||
let dead = true; | ||
this.forEachPool(pool => dead = dead && pool.dead); | ||
if (alive === 0) { | ||
if (dead) { | ||
this.emit('shutdown'); | ||
@@ -125,13 +125,12 @@ } | ||
/** | ||
* Repeat WorkerWrapper events on Master and add 'worker ' prefix to event names | ||
* Repeat WorkerWrapper events from WorkerPool on Master | ||
* so for example 'online' became 'worker online' | ||
* @private | ||
* @param {WorkerWrapper} worker | ||
* @param {WorkerPool} pool | ||
*/ | ||
_proxyWorkerEvents(worker) { | ||
WorkerWrapper.EVENTS | ||
.forEach(eventName => { | ||
const proxyEventName = 'worker ' + eventName; | ||
worker.on(eventName, this.emit.bind(this, proxyEventName, worker)); | ||
}); | ||
_proxyWorkerEvents(pool) { | ||
for (const eventName of WorkerWrapper.EVENTS) { | ||
const proxyEventName = 'worker ' + eventName; | ||
pool.on(proxyEventName, this.emit.bind(this, proxyEventName)); | ||
} | ||
} | ||
@@ -143,7 +142,3 @@ | ||
getWorkersIds() { | ||
if (!this._workersIdsCache) { | ||
this._workersIdsCache = this.getWorkersArray().map(w => w.wid); | ||
} | ||
return this._workersIdsCache; | ||
return this.getWorkersArray().map(w => w.wid); | ||
} | ||
@@ -155,24 +150,13 @@ | ||
getWorkersArray() { | ||
if (!this._workersArrayCache) { | ||
this._workersArrayCache = Object.values(this.workers); | ||
} | ||
return this._workersArrayCache; | ||
let result = []; | ||
this.forEachPool( | ||
pool => result = result.concat(pool.getWorkersArray()) | ||
); | ||
return result; | ||
} | ||
/** | ||
* Add worker to the pool | ||
* @param {WorkerWrapper} worker | ||
* @returns {Master} self | ||
* @public | ||
*/ | ||
add(worker) { | ||
// invalidate Master#getWorkersIds and Master#getWorkersArray cache | ||
this._workersIdsCache = null; | ||
this._workersArrayCache = null; | ||
this.workers[worker.wid] = worker; | ||
this._proxyWorkerEvents(worker); | ||
return this; | ||
forEachPool(fn) { | ||
for (const pool of this.pools.values()) { | ||
fn(pool); | ||
} | ||
} | ||
@@ -190,4 +174,5 @@ | ||
forEach(fn) { | ||
this.getWorkersArray() | ||
.forEach(fn); | ||
this.forEachPool(pool => { | ||
pool.forEach(fn); | ||
}); | ||
@@ -207,2 +192,16 @@ return this; | ||
configure(...args) { | ||
this._setConfig(...args); | ||
if (this.config) { | ||
if (this.config.get('useDefaultPool', true)) { | ||
this.createPool(DEFAULT_POOL_KEY).configure(this.config); | ||
} | ||
this.emit('configured'); | ||
} | ||
return this; | ||
} | ||
/** | ||
@@ -223,44 +222,2 @@ * Configure cluster | ||
} | ||
const // WorkerWrapper options | ||
forkTimeout = this.config.get('control.forkTimeout'), | ||
stopTimeout = this.config.get('control.stopTimeout'), | ||
exitThreshold = this.config.get('control.exitThreshold'), | ||
allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'), | ||
count = this.config.get('workers', os.cpus().length), | ||
isServerPortSet = this.config.has('server.port'), | ||
groups = this.config.get('server.groups', 1), | ||
workersPerGroup = Math.floor(count / groups); | ||
let port, | ||
// workers and groups count | ||
i = 0, | ||
group = 0, | ||
workersInGroup = 0; | ||
if (isServerPortSet) { | ||
port = new Port(this.config.get('server.port')); | ||
} | ||
// create pool of workers | ||
while (count > i++) { | ||
this.add(new WorkerWrapper(this, { | ||
forkTimeout, | ||
stopTimeout, | ||
exitThreshold, | ||
allowedSequentialDeaths, | ||
port: isServerPortSet ? port.next(group) : 0, | ||
maxListeners: this.getMaxListeners(), | ||
})); | ||
// groups > 1, current group is full and | ||
// last workers can form at least more one group | ||
if (groups > 1 && | ||
++workersInGroup >= workersPerGroup && | ||
count - (group + 1) * workersPerGroup >= workersPerGroup) { | ||
workersInGroup = 0; | ||
group++; | ||
} | ||
} | ||
} | ||
@@ -275,19 +232,8 @@ | ||
waitForWorkers(wids, event) { | ||
const pendingWids = new Set(wids); | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool.waitForWorkers(wids, event)) | ||
); | ||
return new Promise(resolve => { | ||
if (pendingWids.size === 0) { | ||
resolve(); | ||
} | ||
const onWorkerState = worker => { | ||
const wid = worker.wid; | ||
pendingWids.delete(wid); | ||
if (pendingWids.size === 0) { | ||
this.removeListener(event, onWorkerState); | ||
resolve(); | ||
} | ||
}; | ||
this.on(event, onWorkerState); | ||
}); | ||
return Promise.all(promises); | ||
} | ||
@@ -317,3 +263,3 @@ | ||
// TODO maybe run this after starting waitForAllWorkers | ||
this.forEach(worker => worker.restart()); | ||
this.forEachPool(pool => pool.restart()); | ||
@@ -337,2 +283,12 @@ await this.waitForAllWorkers('worker ready'); | ||
async _softRestart() { | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool.softRestart()) | ||
); | ||
await Promise.all(promises); | ||
this.emit('restarted'); | ||
} | ||
/** | ||
@@ -347,4 +303,3 @@ * Workers will be restarted one by one using RestartQueue. | ||
softRestart() { | ||
this.forEach(worker => worker.softRestart()); | ||
this._restartQueue.once('drain', this.emit.bind(this, 'restarted')); | ||
this._softRestart(); | ||
return this; | ||
@@ -354,15 +309,2 @@ } | ||
/** | ||
* Schedules one worker restart using RestartQueue. | ||
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed | ||
* into the queue, it will emit 'error' on restart. | ||
* @public | ||
* @param {WorkerWrapper} worker | ||
* @returns {Master} self | ||
*/ | ||
scheduleWorkerRestart(worker) { | ||
this._restartQueue.push(worker); | ||
return this; | ||
} | ||
/** | ||
* @override | ||
@@ -384,11 +326,3 @@ * @see ClusterProcess | ||
remoteCallToAll(name, ...args) { | ||
this.forEach(worker => { | ||
if (worker.ready) { | ||
worker.remoteCall(name, ...args); | ||
} else { | ||
worker.on('ready', () => { | ||
worker.remoteCall(name, ...args); | ||
}); | ||
} | ||
}); | ||
this.forEachPool(pool => pool.remoteCallToAll(name, ...args)); | ||
} | ||
@@ -404,7 +338,3 @@ | ||
broadcastEventToAll(event, ...args) { | ||
this.forEach(worker => { | ||
if (worker.ready) { | ||
worker.broadcastEvent(event, ...args); | ||
} | ||
}); | ||
this.forEachPool(pool => pool.broadcastEventToAll(event, ...args)); | ||
} | ||
@@ -421,3 +351,3 @@ | ||
this.emit(event, ...args); | ||
this.broadcastEventToAll(event, ...args); | ||
this.forEachPool(pool => pool.emitToAll(event, ...args)); | ||
} | ||
@@ -430,16 +360,9 @@ | ||
async _shutdown() { | ||
const stoppedWorkers = []; | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool._shutdown()) | ||
); | ||
this.forEach(worker => { | ||
if (worker.isRunning()) { | ||
worker.stop(); | ||
stoppedWorkers.push(worker.wid); | ||
} | ||
}); | ||
await Promise.all(promises); | ||
await this.waitForWorkers( | ||
stoppedWorkers, | ||
'worker exit', | ||
); | ||
this.emit('shutdown'); | ||
@@ -468,7 +391,3 @@ } | ||
remoteCallToAllWithCallback(opts) { | ||
this.forEach(worker => { | ||
if (worker.isRunning()) { | ||
worker.remoteCallWithCallback(opts); | ||
} | ||
}); | ||
this.forEachPool(pool => pool.remoteCallToAllWithCallback(opts)); | ||
} | ||
@@ -482,3 +401,3 @@ | ||
// TODO maybe run this after starting waitForAllWorkers | ||
this.forEach(worker => worker.run()); | ||
this.forEachPool(pool => pool.run()); | ||
@@ -485,0 +404,0 @@ await this.waitForAllWorkers('worker ready'); |
@@ -18,3 +18,3 @@ const EventEmitterEx = require('./event_emitter_ex'), | ||
class RestartQueue extends EventEmitterEx { | ||
constructor() { | ||
constructor(eexKey = undefined) { | ||
super(); | ||
@@ -27,2 +27,4 @@ | ||
this._queue = []; | ||
this.eexKey = eexKey; | ||
} | ||
@@ -29,0 +31,0 @@ |
@@ -38,2 +38,3 @@ const cluster = require('cluster'), | ||
this.port = options.port; | ||
this.workerEnv = options.workerEnv; | ||
} | ||
@@ -70,3 +71,3 @@ | ||
* @augments EventEmitterEx | ||
* @param {Master} master | ||
* @param {WorkerPool} pool | ||
* @param {WorkerWrapperOptions} options | ||
@@ -80,3 +81,3 @@ * | ||
class WorkerWrapper extends EventEmitterEx { | ||
constructor(master, options) { | ||
constructor(pool, options) { | ||
super(); | ||
@@ -106,2 +107,3 @@ | ||
this._wid = ++nextId; | ||
this.eexKey = this._wid; | ||
@@ -158,6 +160,6 @@ /** | ||
/** | ||
* @type {Master} | ||
* @type {WorkerPool} | ||
* @private | ||
*/ | ||
this._master = master; | ||
this._pool = pool; | ||
@@ -174,3 +176,3 @@ /** | ||
WorkerWrapper._RPC_EVENTS.forEach(event => { | ||
master.on('received worker ' + event, WorkerWrapper.createEventTranslator(event).bind(this)); | ||
pool.master.on('received worker ' + event, this.eventTranslator.bind(this, event)); | ||
}); | ||
@@ -213,2 +215,12 @@ this.on('ready', this._onReady.bind(this)); | ||
/** | ||
* Pool this WorkerWrapper belongs to | ||
* @memberOf {WorkerWrapper} | ||
* @public | ||
* @readonly | ||
*/ | ||
get pool() { | ||
return this._pool; | ||
} | ||
/** | ||
* @see WorkerWrapper.STATES for possible `state` argument values | ||
@@ -227,8 +239,6 @@ * @param {WorkerWrapperState} state | ||
static createEventTranslator(event) { | ||
return /** @this {WorkerWrapper} */function(worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit(event); | ||
} | ||
}; | ||
eventTranslator(event, worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit(event); | ||
} | ||
} | ||
@@ -392,6 +402,6 @@ | ||
/** @private */ | ||
this._worker = cluster.fork({ | ||
this._worker = cluster.fork(Object.assign({ | ||
port: this.options.port, | ||
LUSTER_WID: this.wid, | ||
}); | ||
}, this.options.workerEnv)); | ||
@@ -561,7 +571,7 @@ /** @private */ | ||
/** | ||
* Adds this worker to master's restart queue | ||
* Adds this worker to pool's restart queue | ||
* @public | ||
*/ | ||
softRestart() { | ||
this._master.scheduleWorkerRestart(this); | ||
this._pool.scheduleWorkerRestart(this); | ||
} | ||
@@ -568,0 +578,0 @@ } |
@@ -7,2 +7,4 @@ const cluster = require('cluster'), | ||
const pEvent = require('p-event'); | ||
const wid = parseInt(process.env.LUSTER_WID, 10); | ||
@@ -19,9 +21,7 @@ | ||
this.eexKey = wid; | ||
const broadcastEvent = this._broadcastEvent; | ||
this._foreignPropertiesReceivedPromise = new Promise(resolve => { | ||
this.once('foreign properties received', () => { | ||
resolve(); | ||
}); | ||
}); | ||
this._foreignPropertiesReceivedPromise = pEvent(this, 'foreign properties received'); | ||
@@ -147,3 +147,9 @@ this.on('configured', broadcastEvent.bind(this, 'configured')); | ||
require(workerBase); | ||
try { | ||
require(workerBase); | ||
} catch (e) { | ||
console.error(`Worker failed on require ${workerBase}`); | ||
console.error(e); | ||
process.exit(1); | ||
} | ||
this.emit('loaded', workerBase); | ||
@@ -150,0 +156,0 @@ |
{ | ||
"name": "luster", | ||
"version": "3.0.0-alpha.2", | ||
"version": "3.0.0-alpha.3", | ||
"description": "Node.js cluster wrapper", | ||
@@ -39,2 +39,4 @@ "main": "./lib/luster.js", | ||
"dependencies": { | ||
"delay": "^3.0.0", | ||
"p-event": "^2.1.0", | ||
"terror": "^1.0.0" | ||
@@ -44,2 +46,3 @@ }, | ||
"chai": "^3.5.0", | ||
"chai-as-promised": "^7.1.1", | ||
"eslint": "^4.19.1", | ||
@@ -46,0 +49,0 @@ "eslint-config-nodules": "^0.4.0", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
96960
20
2560
0
3
8
+ Addeddelay@^3.0.0
+ Addedp-event@^2.1.0
+ Addeddelay@3.1.0(transitive)
+ Addedp-event@2.3.1(transitive)
+ Addedp-finally@1.0.0(transitive)
+ Addedp-timeout@2.0.1(transitive)