Comparing version 3.0.0-alpha.1 to 3.0.0-alpha.2
#!/usr/bin/env node | ||
const /** @type {ClusterProcess} */ | ||
luster = require('../lib/luster'), | ||
path = require('path'), | ||
configFilePath = path.resolve(process.cwd(), process.argv[2] || 'luster.conf'); | ||
path = require('path'); | ||
// config path is right after this script in process.argv | ||
const scriptArgvIndex = process.argv.findIndex(arg => arg === __filename || path.resolve(arg) === __filename); | ||
const configFilePath = path.resolve(process.cwd(), process.argv[scriptArgvIndex + 1] || 'luster.conf'); | ||
luster.configure(require(configFilePath), true, path.dirname(configFilePath)).run(); |
@@ -10,4 +10,2 @@ const cluster = require('cluster'), | ||
const pEvent = require('p-event'); | ||
/** | ||
@@ -65,3 +63,5 @@ * @param {Object} context | ||
* */ | ||
this._initPromise = pEvent(this, 'initialized'); | ||
this._initPromise = new Promise(resolve => { | ||
this.once('initialized', resolve); | ||
}); | ||
@@ -68,0 +68,0 @@ /** |
@@ -23,6 +23,2 @@ const path = require('path'), | ||
get raw() { | ||
return this._rawConfig; | ||
} | ||
/** | ||
@@ -101,12 +97,2 @@ * @param {String} path | ||
extend(config) { | ||
return new Configuration( | ||
{ | ||
...this.raw, | ||
...config, | ||
}, | ||
this._resolveBaseDir, | ||
); | ||
} | ||
/** | ||
@@ -113,0 +99,0 @@ * Override config properties using `LUSTER_CONF` environment variable. |
@@ -97,13 +97,2 @@ const Terror = require('terror'), | ||
/** | ||
* @constructor | ||
* @class LusterMasterError | ||
* @augments LusterError | ||
*/ | ||
errors.LusterMasterError = LusterError.create('LusterMasterError', | ||
{ | ||
POOL_KEY_ALREADY_TAKEN: | ||
'Pool key "%key%" is already taken' | ||
}); | ||
module.exports = errors; |
@@ -20,6 +20,4 @@ const util = require('util'), | ||
const key = this.eexKey; | ||
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', this.wid, inspectedArgs); | ||
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', key, inspectedArgs); | ||
return EventEmitter.prototype.emit.apply(this, args); | ||
@@ -26,0 +24,0 @@ }; |
@@ -1,11 +0,9 @@ | ||
const cluster = require('cluster'); | ||
const os = require('os'), | ||
cluster = require('cluster'), | ||
ClusterProcess = require('./cluster_process'), | ||
WorkerWrapper = require('./worker_wrapper'), | ||
Port = require('./port'), | ||
RestartQueue = require('./restart_queue'), | ||
RPC = require('./rpc'); | ||
const ClusterProcess = require('./cluster_process'); | ||
const LusterMasterError = require('./errors').LusterMasterError; | ||
const RPC = require('./rpc'); | ||
const WorkerPool = require('./worker-pool'); | ||
const WorkerWrapper = require('./worker_wrapper'); | ||
const DEFAULT_POOL_KEY = '__default'; | ||
/** | ||
@@ -21,2 +19,17 @@ * @constructor | ||
/** | ||
* @type {Object} | ||
* @property {WorkerWrapper} * | ||
* @public | ||
* @todo make it private or public immutable | ||
*/ | ||
this.workers = {}; | ||
/** | ||
* Workers restart queue. | ||
* @type {RestartQueue} | ||
* @private | ||
*/ | ||
this._restartQueue = new RestartQueue(); | ||
/** | ||
* Configuration object to pass to cluster.setupMaster() | ||
@@ -28,10 +41,9 @@ * @type {Object} | ||
this.pools = new Map(); | ||
this.createPool(DEFAULT_POOL_KEY); | ||
this.id = 0; | ||
this.wid = 0; | ||
this.eexKey = 0; | ||
this.pid = process.pid; | ||
this.on('worker state', this._cleanupUnixSockets.bind(this)); | ||
this.on('worker exit', this._checkWorkersAlive.bind(this)); | ||
// @todo make it optional? | ||
@@ -42,22 +54,2 @@ process.on('SIGINT', this._onSignalQuit.bind(this)); | ||
createPool(key) { | ||
if (this.pools.has(key)) { | ||
throw LusterMasterError.createError( | ||
LusterMasterError.CODES.POOL_KEY_ALREADY_TAKEN, | ||
{key} | ||
); | ||
} | ||
this.emit('create pool', key); | ||
const pool = new WorkerPool(key, this); | ||
this._proxyWorkerEvents(pool); | ||
pool.on('shutdown', this._checkPoolsAlive.bind(this)); | ||
this.pools.set(key, pool); | ||
return pool; | ||
} | ||
getPool(key) { | ||
return this.pools.get(key); | ||
} | ||
/** | ||
@@ -86,10 +78,43 @@ * Allows same object structure as cluster.setupMaster(). | ||
/** | ||
* Remove not used unix socket before worker will try to listen it. | ||
* @param {WorkerWrapper} worker | ||
* @param {WorkerWrapperState} state | ||
* @private | ||
*/ | ||
_cleanupUnixSockets(worker, state) { | ||
const port = worker.options.port; | ||
if (this._restartQueue.has(worker) || | ||
state !== WorkerWrapper.STATES.LAUNCHING || | ||
port.family !== Port.UNIX) { | ||
return; | ||
} | ||
const inUse = this.getWorkersArray().some(w => | ||
worker.wid !== w.wid && | ||
w.isRunning() && | ||
port.isEqualTo(w.options.port) | ||
); | ||
if (!inUse) { | ||
port.unlink(err => { | ||
if (err) { | ||
this.emit('error', err); | ||
} | ||
}); | ||
} | ||
} | ||
/** | ||
* Check for alive workers, if no one here, then emit "shutdown". | ||
* @private | ||
*/ | ||
_checkPoolsAlive() { | ||
let dead = true; | ||
this.forEachPool(pool => dead = dead && pool.dead); | ||
_checkWorkersAlive() { | ||
const workers = this.getWorkersArray(), | ||
alive = workers.reduce( | ||
(count, w) => w.dead ? count - 1 : count, | ||
workers.length | ||
); | ||
if (dead) { | ||
if (alive === 0) { | ||
this.emit('shutdown'); | ||
@@ -100,12 +125,13 @@ } | ||
/** | ||
* Repeat WorkerWrapper events from WorkerPool on Master | ||
* Repeat WorkerWrapper events on Master and add 'worker ' prefix to event names | ||
* so for example 'online' became 'worker online' | ||
* @private | ||
* @param {WorkerPool} pool | ||
* @param {WorkerWrapper} worker | ||
*/ | ||
_proxyWorkerEvents(pool) { | ||
for (const eventName of WorkerWrapper.EVENTS) { | ||
const proxyEventName = 'worker ' + eventName; | ||
pool.on(proxyEventName, this.emit.bind(this, proxyEventName)); | ||
} | ||
_proxyWorkerEvents(worker) { | ||
WorkerWrapper.EVENTS | ||
.forEach(eventName => { | ||
const proxyEventName = 'worker ' + eventName; | ||
worker.on(eventName, this.emit.bind(this, proxyEventName, worker)); | ||
}); | ||
} | ||
@@ -117,3 +143,7 @@ | ||
getWorkersIds() { | ||
return this.getWorkersArray().map(w => w.wid); | ||
if (!this._workersIdsCache) { | ||
this._workersIdsCache = this.getWorkersArray().map(w => w.wid); | ||
} | ||
return this._workersIdsCache; | ||
} | ||
@@ -125,13 +155,24 @@ | ||
getWorkersArray() { | ||
let result = []; | ||
this.forEachPool( | ||
pool => result = result.concat(pool.getWorkersArray()) | ||
); | ||
return result; | ||
if (!this._workersArrayCache) { | ||
this._workersArrayCache = Object.values(this.workers); | ||
} | ||
return this._workersArrayCache; | ||
} | ||
forEachPool(fn) { | ||
for (const pool of this.pools.values()) { | ||
fn(pool); | ||
} | ||
/** | ||
* Add worker to the pool | ||
* @param {WorkerWrapper} worker | ||
* @returns {Master} self | ||
* @public | ||
*/ | ||
add(worker) { | ||
// invalidate Master#getWorkersIds and Master#getWorkersArray cache | ||
this._workersIdsCache = null; | ||
this._workersArrayCache = null; | ||
this.workers[worker.wid] = worker; | ||
this._proxyWorkerEvents(worker); | ||
return this; | ||
} | ||
@@ -149,5 +190,4 @@ | ||
forEach(fn) { | ||
this.forEachPool(pool => { | ||
pool.forEach(fn); | ||
}); | ||
this.getWorkersArray() | ||
.forEach(fn); | ||
@@ -183,3 +223,43 @@ return this; | ||
this.pools.get(DEFAULT_POOL_KEY).configure(this.config); | ||
const // WorkerWrapper options | ||
forkTimeout = this.config.get('control.forkTimeout'), | ||
stopTimeout = this.config.get('control.stopTimeout'), | ||
exitThreshold = this.config.get('control.exitThreshold'), | ||
allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'), | ||
count = this.config.get('workers', os.cpus().length), | ||
isServerPortSet = this.config.has('server.port'), | ||
groups = this.config.get('server.groups', 1), | ||
workersPerGroup = Math.floor(count / groups); | ||
let port, | ||
// workers and groups count | ||
i = 0, | ||
group = 0, | ||
workersInGroup = 0; | ||
if (isServerPortSet) { | ||
port = new Port(this.config.get('server.port')); | ||
} | ||
// create pool of workers | ||
while (count > i++) { | ||
this.add(new WorkerWrapper(this, { | ||
forkTimeout, | ||
stopTimeout, | ||
exitThreshold, | ||
allowedSequentialDeaths, | ||
port: isServerPortSet ? port.next(group) : 0, | ||
maxListeners: this.getMaxListeners(), | ||
})); | ||
// groups > 1, current group is full and | ||
// last workers can form at least more one group | ||
if (groups > 1 && | ||
++workersInGroup >= workersPerGroup && | ||
count - (group + 1) * workersPerGroup >= workersPerGroup) { | ||
workersInGroup = 0; | ||
group++; | ||
} | ||
} | ||
} | ||
@@ -194,8 +274,19 @@ | ||
waitForWorkers(wids, event) { | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool.waitForWorkers(wids, event)) | ||
); | ||
const pendingWids = new Set(wids); | ||
return Promise.all(promises); | ||
return new Promise(resolve => { | ||
if (pendingWids.size === 0) { | ||
resolve(); | ||
} | ||
const onWorkerState = worker => { | ||
const wid = worker.wid; | ||
pendingWids.delete(wid); | ||
if (pendingWids.size === 0) { | ||
this.removeListener(event, onWorkerState); | ||
resolve(); | ||
} | ||
}; | ||
this.on(event, onWorkerState); | ||
}); | ||
} | ||
@@ -225,3 +316,3 @@ | ||
// TODO maybe run this after starting waitForAllWorkers | ||
this.forEachPool(pool => pool.restart()); | ||
this.forEach(worker => worker.restart()); | ||
@@ -245,12 +336,2 @@ await this.waitForAllWorkers('worker ready'); | ||
async _softRestart() { | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool.softRestart()) | ||
); | ||
await Promise.all(promises); | ||
this.emit('restarted'); | ||
} | ||
/** | ||
@@ -265,3 +346,4 @@ * Workers will be restarted one by one using RestartQueue. | ||
softRestart() { | ||
this._softRestart(); | ||
this.forEach(worker => worker.softRestart()); | ||
this._restartQueue.once('drain', this.emit.bind(this, 'restarted')); | ||
return this; | ||
@@ -271,2 +353,15 @@ } | ||
/** | ||
* Schedules one worker restart using RestartQueue. | ||
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed | ||
* into the queue, it will emit 'error' on restart. | ||
* @public | ||
* @param {WorkerWrapper} worker | ||
* @returns {Master} self | ||
*/ | ||
scheduleWorkerRestart(worker) { | ||
this._restartQueue.push(worker); | ||
return this; | ||
} | ||
/** | ||
* @override | ||
@@ -288,3 +383,11 @@ * @see ClusterProcess | ||
remoteCallToAll(name, ...args) { | ||
this.forEachPool(pool => pool.remoteCallToAll(name, ...args)); | ||
this.forEach(worker => { | ||
if (worker.ready) { | ||
worker.remoteCall(name, ...args); | ||
} else { | ||
worker.on('ready', () => { | ||
worker.remoteCall(name, ...args); | ||
}); | ||
} | ||
}); | ||
} | ||
@@ -300,3 +403,7 @@ | ||
broadcastEventToAll(event, ...args) { | ||
this.forEachPool(pool => pool.broadcastEventToAll(event, ...args)); | ||
this.forEach(worker => { | ||
if (worker.ready) { | ||
worker.broadcastEvent(event, ...args); | ||
} | ||
}); | ||
} | ||
@@ -313,3 +420,3 @@ | ||
this.emit(event, ...args); | ||
this.forEachPool(pool => pool.emitToAll(event, ...args)); | ||
this.broadcastEventToAll(event, ...args); | ||
} | ||
@@ -322,9 +429,16 @@ | ||
async _shutdown() { | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool._shutdown()) | ||
const stoppedWorkers = []; | ||
this.forEach(worker => { | ||
if (worker.isRunning()) { | ||
worker.stop(); | ||
stoppedWorkers.push(worker.wid); | ||
} | ||
}); | ||
await this.waitForWorkers( | ||
stoppedWorkers, | ||
'worker exit', | ||
); | ||
await Promise.all(promises); | ||
this.emit('shutdown'); | ||
@@ -353,3 +467,7 @@ } | ||
remoteCallToAllWithCallback(opts) { | ||
this.forEachPool(pool => pool.remoteCallToAllWithCallback(opts)); | ||
this.forEach(worker => { | ||
if (worker.isRunning()) { | ||
worker.remoteCallWithCallback(opts); | ||
} | ||
}); | ||
} | ||
@@ -363,3 +481,3 @@ | ||
// TODO maybe run this after starting waitForAllWorkers | ||
this.forEachPool(pool => pool.run()); | ||
this.forEach(worker => worker.run()); | ||
@@ -366,0 +484,0 @@ await this.waitForAllWorkers('worker ready'); |
@@ -18,3 +18,3 @@ const EventEmitterEx = require('./event_emitter_ex'), | ||
class RestartQueue extends EventEmitterEx { | ||
constructor(eexKey = undefined) { | ||
constructor() { | ||
super(); | ||
@@ -27,4 +27,2 @@ | ||
this._queue = []; | ||
this.eexKey = eexKey; | ||
} | ||
@@ -31,0 +29,0 @@ |
@@ -38,3 +38,2 @@ const cluster = require('cluster'), | ||
this.port = options.port; | ||
this.workerEnv = options.workerEnv; | ||
} | ||
@@ -71,3 +70,3 @@ | ||
* @augments EventEmitterEx | ||
* @param {WorkerPool} pool | ||
* @param {Master} master | ||
* @param {WorkerWrapperOptions} options | ||
@@ -81,3 +80,3 @@ * | ||
class WorkerWrapper extends EventEmitterEx { | ||
constructor(pool, options) { | ||
constructor(master, options) { | ||
super(); | ||
@@ -107,3 +106,2 @@ | ||
this._wid = ++nextId; | ||
this.eexKey = this._wid; | ||
@@ -160,6 +158,6 @@ /** | ||
/** | ||
* @type {WorkerPool} | ||
* @type {Master} | ||
* @private | ||
*/ | ||
this._pool = pool; | ||
this._master = master; | ||
@@ -176,3 +174,3 @@ /** | ||
WorkerWrapper._RPC_EVENTS.forEach(event => { | ||
pool.master.on('received worker ' + event, this.eventTranslator.bind(this, event)); | ||
master.on('received worker ' + event, WorkerWrapper.createEventTranslator(event).bind(this)); | ||
}); | ||
@@ -215,12 +213,2 @@ this.on('ready', this._onReady.bind(this)); | ||
/** | ||
* Pool this WorkerWrapper belongs to | ||
* @memberOf {WorkerWrapper} | ||
* @public | ||
* @readonly | ||
*/ | ||
get pool() { | ||
return this._pool; | ||
} | ||
/** | ||
* @see WorkerWrapper.STATES for possible `state` argument values | ||
@@ -239,6 +227,8 @@ * @param {WorkerWrapperState} state | ||
eventTranslator(event, worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit(event); | ||
} | ||
static createEventTranslator(event) { | ||
return /** @this {WorkerWrapper} */function(worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit(event); | ||
} | ||
}; | ||
} | ||
@@ -402,6 +392,6 @@ | ||
/** @private */ | ||
this._worker = cluster.fork(Object.assign({ | ||
this._worker = cluster.fork({ | ||
port: this.options.port, | ||
LUSTER_WID: this.wid, | ||
}, this.options.workerEnv)); | ||
}); | ||
@@ -571,7 +561,7 @@ /** @private */ | ||
/** | ||
* Adds this worker to pool's restart queue | ||
* Adds this worker to master's restart queue | ||
* @public | ||
*/ | ||
softRestart() { | ||
this._pool.scheduleWorkerRestart(this); | ||
this._master.scheduleWorkerRestart(this); | ||
} | ||
@@ -578,0 +568,0 @@ } |
@@ -7,4 +7,2 @@ const cluster = require('cluster'), | ||
const pEvent = require('p-event'); | ||
const wid = parseInt(process.env.LUSTER_WID, 10); | ||
@@ -21,7 +19,9 @@ | ||
this.eexKey = wid; | ||
const broadcastEvent = this._broadcastEvent; | ||
this._foreignPropertiesReceivedPromise = pEvent(this, 'foreign properties received'); | ||
this._foreignPropertiesReceivedPromise = new Promise(resolve => { | ||
this.once('foreign properties received', () => { | ||
resolve(); | ||
}); | ||
}); | ||
@@ -147,9 +147,3 @@ this.on('configured', broadcastEvent.bind(this, 'configured')); | ||
try { | ||
require(workerBase); | ||
} catch (e) { | ||
console.error(`Worker failed on require ${workerBase}`); | ||
console.error(e); | ||
process.exit(1); | ||
} | ||
require(workerBase); | ||
this.emit('loaded', workerBase); | ||
@@ -156,0 +150,0 @@ |
{ | ||
"name": "luster", | ||
"version": "3.0.0-alpha.1", | ||
"version": "3.0.0-alpha.2", | ||
"description": "Node.js cluster wrapper", | ||
@@ -39,4 +39,2 @@ "main": "./lib/luster.js", | ||
"dependencies": { | ||
"delay": "^3.0.0", | ||
"p-event": "^2.1.0", | ||
"terror": "^1.0.0" | ||
@@ -46,3 +44,2 @@ }, | ||
"chai": "^3.5.0", | ||
"chai-as-promised": "^7.1.1", | ||
"eslint": "^4.19.1", | ||
@@ -49,0 +46,0 @@ "eslint-config-nodules": "^0.4.0", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1
7
86129
19
2182
1
- Removeddelay@^3.0.0
- Removedp-event@^2.1.0
- Removeddelay@3.1.0(transitive)
- Removedp-event@2.3.1(transitive)
- Removedp-finally@1.0.0(transitive)
- Removedp-timeout@2.0.1(transitive)