Comparing version 2.0.1 to 3.0.0-alpha.0
#!/usr/bin/env node | ||
var /** @type {ClusterProcess} */ | ||
const /** @type {ClusterProcess} */ | ||
luster = require('../lib/luster'), | ||
@@ -4,0 +4,0 @@ path = require('path'), |
@@ -1,2 +0,2 @@ | ||
var cluster = require('cluster'), | ||
const cluster = require('cluster'), | ||
path = require('path'), | ||
@@ -8,5 +8,6 @@ RPC = require('./rpc'), | ||
LusterClusterProcessError = require('./errors').LusterClusterProcessError, | ||
LusterConfigurationError = require('./errors').LusterConfigurationError, | ||
ClusterProcess; | ||
LusterConfigurationError = require('./errors').LusterConfigurationError; | ||
const pEvent = require('p-event'); | ||
/** | ||
@@ -22,2 +23,23 @@ * @param {Object} context | ||
/** | ||
* Add `basedir`, `node_modules` contained in the `basedir` and its ancestors to `module.paths` | ||
* @param {String} basedir | ||
*/ | ||
function extendResolvePath(basedir) { | ||
// using module internals isn't good, but restarting with corrected NODE_PATH looks more ugly, IMO | ||
module.paths.push(basedir); | ||
const _basedir = basedir.split('/'), | ||
size = basedir.length; | ||
let i = 0; | ||
while (size > i++) { | ||
const modulesPath = _basedir.slice(0, i).join('/') + '/node_modules'; | ||
if (module.paths.indexOf(modulesPath) === -1) { | ||
module.paths.push(modulesPath); | ||
} | ||
} | ||
} | ||
/** | ||
* @typedef Extension | ||
@@ -32,311 +54,275 @@ * @property {Function} [configure] (Object config, ClusterProcess proc) | ||
*/ | ||
ClusterProcess = EventEmitterEx.create(function ClusterProcess() { | ||
ClusterProcess.__super.apply(this, arguments); | ||
class ClusterProcess extends EventEmitterEx { | ||
constructor() { | ||
super(); | ||
/** @private */ | ||
this._remoteCommands = {}; | ||
/** @private */ | ||
this._initialized = false; | ||
/** @private */ | ||
this.extensions = {}; | ||
/** @private */ | ||
this._remoteCommands = {}; | ||
/** @private */ | ||
this.extensions = {}; | ||
/** | ||
* @type Promise<void> | ||
* @private | ||
* */ | ||
this._initPromise = pEvent(this, 'initialized'); | ||
/** | ||
* @type {Configuration} | ||
* @public | ||
*/ | ||
this.config = null; | ||
/** | ||
* @type {Configuration} | ||
* @public | ||
*/ | ||
this.config = null; | ||
this.once('configured', this._onConfigured.bind(this)); | ||
this.once('configured', this._onConfigured.bind(this)); | ||
this._setupIPCMessagesHandler(); | ||
this._setupIPCMessagesHandler(); | ||
this.registerRemoteCommand(RPC.fns.callback, RPCCallback.processCallback.bind(RPCCallback)); | ||
}); | ||
this.registerRemoteCommand(RPC.fns.callback, RPCCallback.processCallback.bind(RPCCallback)); | ||
} | ||
/** | ||
* @memberOf ClusterProcess | ||
* @property {Boolean} isMaster | ||
* @readonly | ||
* @public | ||
*/ | ||
Object.defineProperty(ClusterProcess.prototype, 'isMaster', { | ||
value: cluster.isMaster, | ||
enumerable: true | ||
}); | ||
/** | ||
* @memberOf ClusterProcess | ||
* @property {Boolean} isWorker | ||
* @readonly | ||
* @public | ||
*/ | ||
Object.defineProperty(ClusterProcess.prototype, 'isWorker', { | ||
value: cluster.isWorker, | ||
enumerable: true | ||
}); | ||
/** | ||
* @event ClusterProcess#configured | ||
*/ | ||
/** | ||
* @fires ClusterProcess#configured | ||
* @param {Object} config | ||
* @param {Boolean} [applyEnv=true] | ||
* @param {String} [basedir=process.cwd()] for Configuration#resolve relative paths | ||
* @returns {ClusterProcess} this | ||
* @throws {LusterConfigurationError} if configuration check failed (check errors will be logged to STDERR) | ||
* @public | ||
*/ | ||
ClusterProcess.prototype.configure = function(config, applyEnv, basedir) { | ||
if (typeof applyEnv === 'undefined' || applyEnv) { | ||
Configuration.applyEnvironment(config); | ||
/** | ||
* @memberOf ClusterProcess | ||
* @property {Boolean} isMaster | ||
* @readonly | ||
* @public | ||
*/ | ||
get isMaster() { | ||
return cluster.isMaster; | ||
} | ||
if (typeof(basedir) === 'undefined') { | ||
basedir = process.cwd(); | ||
/** | ||
* @memberOf ClusterProcess | ||
* @property {Boolean} isWorker | ||
* @readonly | ||
* @public | ||
*/ | ||
get isWorker() { | ||
return cluster.isWorker; | ||
} | ||
if (Configuration.check(config) > 0) { | ||
this.emit('error', | ||
LusterConfigurationError.createError( | ||
LusterConfigurationError.CODES.CONFIGURATION_CHECK_FAILED)); | ||
} else { | ||
this.config = Configuration.extend(config, basedir); | ||
/** | ||
* @event ClusterProcess#configured | ||
*/ | ||
// hack to tweak underlying EventEmitter max listeners | ||
// if your luster-based app extensively use luster events | ||
this.setMaxListeners(this.config.get('maxEventListeners', 100)); | ||
/** | ||
* @fires ClusterProcess#configured | ||
* @param {Object} config | ||
* @param {Boolean} [applyEnv=true] | ||
* @param {String} [basedir=process.cwd()] for Configuration#resolve relative paths | ||
* @returns {ClusterProcess} this | ||
* @throws {LusterConfigurationError} if configuration check failed (check errors will be logged to STDERR) | ||
* @public | ||
*/ | ||
configure(config, applyEnv, basedir) { | ||
if (typeof applyEnv === 'undefined' || applyEnv) { | ||
Configuration.applyEnvironment(config); | ||
} | ||
this.emit('configured'); | ||
} | ||
if (typeof(basedir) === 'undefined') { | ||
basedir = process.cwd(); | ||
} | ||
return this; | ||
}; | ||
if (Configuration.check(config) > 0) { | ||
this.emit('error', | ||
LusterConfigurationError.createError( | ||
LusterConfigurationError.CODES.CONFIGURATION_CHECK_FAILED)); | ||
} else { | ||
this.config = Configuration.extend(config, basedir); | ||
/** | ||
* @param {String} name | ||
* @param {Function} callback function(error) | ||
*/ | ||
ClusterProcess.prototype.loadExtension = function(name, callback) { | ||
var /** @type Extension */ | ||
extension = require(name), | ||
self = this, | ||
config = this.config.get('extensions.' + name); | ||
// hack to tweak underlying EventEmitter max listeners | ||
// if your luster-based app extensively use luster events | ||
this.setMaxListeners(this.config.get('maxEventListeners', 100)); | ||
this.extensions[name] = extension; | ||
this.emit('configured'); | ||
} | ||
// if `config` was an Object then it became instance of Configuration | ||
// else returns original value | ||
config = Configuration.extend(config, this.config.getBaseDir()); | ||
if (extension.configure.length > 2) { | ||
setImmediate(function() { | ||
extension.configure(config, self, callback); | ||
}); | ||
} else { | ||
setImmediate(function() { | ||
extension.configure(config, self); | ||
callback(); | ||
}); | ||
return this; | ||
} | ||
}; | ||
/** | ||
* Add `basedir`, `node_modules` contained in the `basedir` and its ancestors to `module.paths` | ||
* @param {String} basedir | ||
*/ | ||
function extendResolvePath(basedir) { | ||
// using module internals isn't good, but restarting with corrected NODE_PATH looks more ugly, IMO | ||
module.paths.push(basedir); | ||
/** | ||
* @param {String} name | ||
* @param {Function} callback function(error) | ||
*/ | ||
loadExtension(name, callback) { | ||
const /** @type Extension */ | ||
extension = require(name); | ||
let config = this.config.get('extensions.' + name); | ||
var _basedir = basedir.split('/'), | ||
size = basedir.length, | ||
i = 0, | ||
modulesPath; | ||
this.extensions[name] = extension; | ||
while (size > i++) { | ||
modulesPath = _basedir.slice(0, i).join('/') + '/node_modules'; | ||
// if `config` was an Object then it became instance of Configuration | ||
// else returns original value | ||
config = Configuration.extend(config, this.config.getBaseDir()); | ||
if (module.paths.indexOf(modulesPath) === -1) { | ||
module.paths.push(modulesPath); | ||
if (extension.configure.length > 2) { | ||
setImmediate(() => extension.configure(config, this, callback)); | ||
} else { | ||
setImmediate(() => { | ||
extension.configure(config, this); | ||
callback(); | ||
}); | ||
} | ||
} | ||
} | ||
/** | ||
* @event ClusterProcess#initialized | ||
*/ | ||
/** | ||
* @event ClusterProcess#initialized | ||
*/ | ||
/** | ||
* @fires ClusterProcess#initialized | ||
* @private | ||
*/ | ||
ClusterProcess.prototype._onConfigured = function() { | ||
cluster.setMaxListeners(this.getMaxListeners()); | ||
/** | ||
* @fires ClusterProcess#initialized | ||
* @private | ||
*/ | ||
_onConfigured() { | ||
cluster.setMaxListeners(this.getMaxListeners()); | ||
// try to use `extensionsPath` option to resolve extensions' modules | ||
// use worker file directory as fallback | ||
extendResolvePath(path.resolve( | ||
this.config.resolve('extensionsPath', path.dirname(this.config.resolve('app'))) | ||
)); | ||
// try to use `extensionsPath` option to resolve extensions' modules | ||
// use worker file directory as fallback | ||
extendResolvePath(path.resolve( | ||
this.config.resolve('extensionsPath', path.dirname(this.config.resolve('app'))) | ||
)); | ||
var self = this, | ||
extensions = this.config.getKeys('extensions'), | ||
wait = extensions.length, | ||
loadedExtensions = [], | ||
loadTimeout = this.config.get('extensionsLoadTimeout', 10000), | ||
loadTimer; | ||
const extensions = this.config.getKeys('extensions'), | ||
wait = extensions.length, | ||
loadedExtensions = new Set(), | ||
loadTimeout = this.config.get('extensionsLoadTimeout', 10000); | ||
let loadTimer; | ||
if (wait === 0) { | ||
self._initialized = true; | ||
self.emit('initialized'); | ||
return; | ||
} | ||
if (wait === 0) { | ||
this.emit('initialized'); | ||
return; | ||
} | ||
extensions.forEach(function(name) { | ||
this.loadExtension(name, function(error) { | ||
if (error) { | ||
return self.emit('error', error); | ||
} | ||
extensions.forEach(name => { | ||
this.loadExtension(name, error => { | ||
if (error) { | ||
return this.emit('error', error); | ||
} | ||
loadedExtensions.push(name); | ||
self.emit('extension loaded', name); | ||
loadedExtensions.add(name); | ||
this.emit('extension loaded', name); | ||
if (loadedExtensions.length === wait) { | ||
clearTimeout(loadTimer); | ||
self._initialized = true; | ||
self.emit('initialized'); | ||
} | ||
if (loadedExtensions.size === wait) { | ||
clearTimeout(loadTimer); | ||
this.emit('initialized'); | ||
} | ||
}); | ||
}); | ||
}, this); | ||
loadTimer = setTimeout(function() { | ||
var timeouted = extensions.filter(function (name) { | ||
return loadedExtensions.indexOf(name) < 0; | ||
}), | ||
error = LusterClusterProcessError.createError( | ||
LusterClusterProcessError.CODES.EXTENSIONS_LOAD_TIMEOUT, | ||
{ timeouted: timeouted, timeout: loadTimeout }); | ||
loadTimer = setTimeout(() => { | ||
const timeouted = extensions.filter(name => !loadedExtensions.has(name)), | ||
error = LusterClusterProcessError.createError( | ||
LusterClusterProcessError.CODES.EXTENSIONS_LOAD_TIMEOUT, | ||
{timeouted, timeout: loadTimeout}); | ||
self.emit('error', error); | ||
}, loadTimeout); | ||
}; | ||
this.emit('error', error); | ||
}, loadTimeout); | ||
} | ||
/** | ||
* Wrap `fn` to delay it execution to ClusterProcess done initialization. | ||
* @this {ClusterProcess} | ||
* @param {Function} fn | ||
* @returns {Function} | ||
*/ | ||
ClusterProcess.whenInitialized = function(fn) { | ||
return /** @this {ClusterProcess} */function() { | ||
if (this._initialized) { | ||
setImmediate(fn.bind(this)); | ||
} else { | ||
this.once('initialized', fn.bind(this)); | ||
/** | ||
* Resolves when ClusterProcess done initialization. | ||
* @this {ClusterProcess} | ||
* @returns {Promise<void>} | ||
*/ | ||
whenInitialized() { | ||
return this._initPromise; | ||
} | ||
/** | ||
* Register `fn` as allowed for remote call via IPC. | ||
* @param {String} name | ||
* @param {Function} fn | ||
* @throws LusterClusterProcessError if remote procedure with `name` already registered. | ||
* @public | ||
*/ | ||
registerRemoteCommand(name, fn) { | ||
if (has(this._remoteCommands, name)) { | ||
throw LusterClusterProcessError.createError( | ||
LusterClusterProcessError.CODES.REMOTE_COMMAND_ALREADY_REGISTERED, | ||
{name}); | ||
} | ||
return this; | ||
}; | ||
}; | ||
this._remoteCommands[name] = fn; | ||
} | ||
/** | ||
* Register `fn` as allowed for remote call via IPC. | ||
* @param {String} name | ||
* @param {Function} fn | ||
* @throws LusterClusterProcessError if remote procedure with `name` already registered. | ||
* @public | ||
*/ | ||
ClusterProcess.prototype.registerRemoteCommand = function(name, fn) { | ||
if (has(this._remoteCommands, name)) { | ||
throw LusterClusterProcessError.createError( | ||
LusterClusterProcessError.CODES.REMOTE_COMMAND_ALREADY_REGISTERED, | ||
{ name: name }); | ||
/** | ||
* Remove previously registered remote command | ||
* @param {String} name | ||
* @public | ||
*/ | ||
unregisterRemoteCommand(name) { | ||
delete this._remoteCommands[name]; | ||
} | ||
this._remoteCommands[name] = fn; | ||
}; | ||
/** | ||
* Remove previously registered remote command | ||
* @param {String} name | ||
* @public | ||
*/ | ||
ClusterProcess.prototype.unregisterRemoteCommand = function(name) { | ||
delete this._remoteCommands[name]; | ||
}; | ||
/** | ||
* Checks is remote command registered. | ||
* @param {String} name | ||
* @returns {Boolean} | ||
*/ | ||
ClusterProcess.prototype.hasRegisteredRemoteCommand = function(name) { | ||
return has(this._remoteCommands, name); | ||
}; | ||
/** | ||
* @abstract | ||
* @throws LusterClusterProcessError if method is not overriden in the inheritor of ClusterProcess | ||
* @private | ||
*/ | ||
ClusterProcess.prototype._setupIPCMessagesHandler = function() { | ||
throw LusterClusterProcessError.createError( | ||
LusterClusterProcessError.CODES.ABSTRACT_METHOD_IS_NOT_IMPLEMENTED, | ||
{ | ||
method: 'ClusterProcess#_setupIPCMessagesHandler', | ||
klass: this.constructor.name | ||
}); | ||
}; | ||
/** | ||
* Call function registered as remote command if `rawMessage` is valid luster IPC message | ||
* @param {WorkerWrapper|Worker} target object with `remoteCall` method which can be used to respond to message | ||
* @param {*} rawMessage | ||
* @see RPC | ||
* @private | ||
*/ | ||
ClusterProcess.prototype._onMessage = function(target, rawMessage) { | ||
var message = RPC.parseMessage(rawMessage); | ||
if (message === null) { | ||
return; | ||
/** | ||
* Checks is remote command registered. | ||
* @param {String} name | ||
* @returns {Boolean} | ||
*/ | ||
hasRegisteredRemoteCommand(name) { | ||
return has(this._remoteCommands, name); | ||
} | ||
if ( ! has(this._remoteCommands, message.cmd)) { | ||
/** | ||
* @abstract | ||
* @throws LusterClusterProcessError if method is not overriden in the inheritor of ClusterProcess | ||
* @private | ||
*/ | ||
_setupIPCMessagesHandler() { | ||
throw LusterClusterProcessError.createError( | ||
LusterClusterProcessError.CODES.REMOTE_COMMAND_IS_NOT_REGISTERED, | ||
LusterClusterProcessError.CODES.ABSTRACT_METHOD_IS_NOT_IMPLEMENTED, | ||
{ | ||
name: message.cmd, | ||
method: 'ClusterProcess#_setupIPCMessagesHandler', | ||
klass: this.constructor.name | ||
}); | ||
} else if (typeof message.args === 'undefined') { | ||
this._remoteCommands[message.cmd].call(null, target); | ||
} else { | ||
this._remoteCommands[message.cmd].apply(null, [target].concat(message.args)); | ||
} | ||
}; | ||
/** | ||
* Register remote command with respect to the presence of callback | ||
* @param {String} command | ||
* @param {Function} handler | ||
*/ | ||
ClusterProcess.prototype.registerRemoteCommandWithCallback = function(command, handler) { | ||
/** | ||
* @param {ClusterProcess} proc | ||
* @param {*} [data] | ||
* @param {String} callbackId | ||
* Call function registered as remote command if `rawMessage` is valid luster IPC message | ||
* @param {WorkerWrapper|Worker} target object with `remoteCall` method which can be used to respond to message | ||
* @param {*} rawMessage | ||
* @see RPC | ||
* @private | ||
*/ | ||
this.registerRemoteCommand(command, function(proc, data, callbackId) { | ||
_onMessage(target, rawMessage) { | ||
const message = RPC.parseMessage(rawMessage); | ||
if (message === null) { | ||
return; | ||
} | ||
if (!has(this._remoteCommands, message.cmd)) { | ||
throw LusterClusterProcessError.createError( | ||
LusterClusterProcessError.CODES.REMOTE_COMMAND_IS_NOT_REGISTERED, | ||
{ | ||
name: message.cmd, | ||
klass: this.constructor.name | ||
}); | ||
} else if (typeof message.args === 'undefined') { | ||
this._remoteCommands[message.cmd](target); | ||
} else { | ||
this._remoteCommands[message.cmd](target, ...message.args); | ||
} | ||
} | ||
/** | ||
* Register remote command with respect to the presence of callback | ||
* @param {String} command | ||
* @param {Function} handler | ||
*/ | ||
registerRemoteCommandWithCallback(command, handler) { | ||
/** | ||
* @param {*} [callbackData] | ||
* @param {ClusterProcess} proc | ||
* @param {*} [data] | ||
* @param {String} callbackId | ||
*/ | ||
return handler(function(callbackData) { | ||
proc.remoteCall(RPC.fns.callback, callbackId, callbackData); | ||
}, data); | ||
}); | ||
}; | ||
this.registerRemoteCommand(command, (proc, data, callbackId) => { | ||
/** | ||
* @param {*} [callbackData] | ||
*/ | ||
return handler(callbackData => { | ||
proc.remoteCall(RPC.fns.callback, callbackId, callbackData); | ||
}, data); | ||
}); | ||
} | ||
} | ||
module.exports = ClusterProcess; |
@@ -1,5 +0,4 @@ | ||
var LusterConfigurationError = require('../errors').LusterConfigurationError, | ||
const LusterConfigurationError = require('../errors').LusterConfigurationError, | ||
typeOf = require('./helpers').typeOf, | ||
get = require('./helpers').get, | ||
CHECKS; | ||
get = require('./helpers').get; | ||
@@ -19,3 +18,3 @@ /** | ||
*/ | ||
CHECKS = { | ||
const CHECKS = { | ||
// path to worker main module | ||
@@ -55,4 +54,3 @@ 'app': { required: true, type: 'string' }, | ||
function checkProperty(path, value, check) { | ||
var type = typeOf(value), | ||
allowedTypes; | ||
const type = typeOf(value); | ||
@@ -71,3 +69,3 @@ // required property | ||
// allowed types | ||
allowedTypes = check.type && [].concat(check.type); | ||
const allowedTypes = check.type && [].concat(check.type); | ||
if (allowedTypes && allowedTypes.indexOf(type) === -1) { | ||
@@ -78,3 +76,3 @@ throw LusterConfigurationError.createError( | ||
property: path, | ||
type: type, | ||
type, | ||
expected: allowedTypes.join(' or ') | ||
@@ -91,15 +89,13 @@ }); | ||
function checkConfiguration(conf) { | ||
var failedChecks = 0; | ||
let failedChecks = 0; | ||
Object | ||
.keys(CHECKS) | ||
.forEach(function(path) { | ||
// @todo avoid try..catch | ||
try { | ||
checkProperty(path, get(conf, path), CHECKS[path]); | ||
} catch (error) { | ||
LusterConfigurationError.ensureError(error).log(); | ||
++failedChecks; | ||
} | ||
}); | ||
for (const path of Object.keys(CHECKS)) { | ||
// @todo avoid try..catch | ||
try { | ||
checkProperty(path, get(conf, path), CHECKS[path]); | ||
} catch (error) { | ||
LusterConfigurationError.ensureError(error).log(); | ||
++failedChecks; | ||
} | ||
} | ||
@@ -106,0 +102,0 @@ return failedChecks; |
@@ -1,2 +0,2 @@ | ||
var util = require('util'), | ||
const util = require('util'), | ||
LusterConfigurationError = require('../errors').LusterConfigurationError; | ||
@@ -9,3 +9,3 @@ | ||
function typeOf(value) { | ||
var type = typeof value; | ||
let type = typeof value; | ||
@@ -33,12 +33,11 @@ if (type === 'object') { | ||
function set(context, path, value) { | ||
var ctx = context, | ||
props = path.split('.'), | ||
let ctx = context; | ||
const props = path.split('.'), | ||
target = props.pop(), | ||
i, size, | ||
propName, | ||
type; | ||
size = props.length; | ||
for (i = 0, size = props.length; i < size; i++) { | ||
propName = props[i]; | ||
type = typeOf(ctx[propName]); | ||
for (let i = 0; i < size; i++) { | ||
const propName = props[i]; | ||
const type = typeOf(ctx[propName]); | ||
@@ -71,8 +70,8 @@ if (type === 'undefined') { | ||
var props = path.split('.'), | ||
prop = props[0], | ||
i, size, | ||
ctx = context; | ||
const props = path.split('.'), | ||
size = props.length; | ||
for (i = 0, size = props.length; i < size; prop = props[++i]) { | ||
let ctx = context; | ||
for (let i = 0, prop = props[0]; i < size; prop = props[++i]) { | ||
if (typeof ctx === 'undefined' || ctx === null || | ||
@@ -99,8 +98,8 @@ ! Object.prototype.hasOwnProperty.call(ctx, prop)) { | ||
var props = path.split('.'), | ||
prop = props[0], | ||
i, size, | ||
ctx = context; | ||
const props = path.split('.'), | ||
size = props.length; | ||
for (i = 0, size = props.length; i < size; prop = props[++i]) { | ||
let ctx = context; | ||
for (let i = 0, prop = props[0]; i < size; prop = props[++i]) { | ||
if (typeof ctx === 'undefined' || ctx === null || | ||
@@ -118,6 +117,6 @@ ! Object.prototype.hasOwnProperty.call(ctx, prop)) { | ||
module.exports = { | ||
typeOf: typeOf, | ||
has: has, | ||
get: get, | ||
set: set, | ||
typeOf, | ||
has, | ||
get, | ||
set, | ||
}; |
@@ -1,2 +0,2 @@ | ||
var path = require('path'), | ||
const path = require('path'), | ||
typeOf = require('./helpers').typeOf, | ||
@@ -11,174 +11,193 @@ get = require('./helpers').get, | ||
*/ | ||
function Configuration(config, basedir) { | ||
/** @private */ | ||
this._resolveBaseDir = basedir || process.cwd(); | ||
class Configuration { | ||
constructor(config, basedir) { | ||
/** @private */ | ||
this._resolveBaseDir = basedir || process.cwd(); | ||
Object | ||
.keys(config) | ||
.forEach(function(propName) { | ||
this[propName] = config[propName]; | ||
}, this); | ||
} | ||
if (config instanceof Configuration) { | ||
config = config._rawConfig; | ||
} | ||
this._rawConfig = {}; | ||
Object.assign(this._rawConfig, config); | ||
} | ||
/** | ||
* @param {String} path | ||
* @param {*} [defaultValue] | ||
* @returns {*} | ||
* @see get | ||
* @public | ||
*/ | ||
Configuration.prototype.get = function(path, defaultValue) { | ||
return get(this, path, defaultValue); | ||
}; | ||
get raw() { | ||
return this._rawConfig; | ||
} | ||
/** | ||
* @param {String} path | ||
* @returns {Boolean} | ||
* @see has | ||
* @public | ||
*/ | ||
Configuration.prototype.has = function(path) { | ||
return has(this, path); | ||
}; | ||
/** | ||
* @param {String} path | ||
* @param {*} [defaultValue] | ||
* @returns {*} | ||
* @see get | ||
* @public | ||
*/ | ||
get(path, defaultValue) { | ||
return get(this._rawConfig, path, defaultValue); | ||
} | ||
/** | ||
* Shortcut for `Object.keys(c.get('some.obj.prop', {}))` | ||
* @param {String} [path] | ||
* @returns {String[]} keys of object property by path or | ||
* empty array if property doesn't exists or not an object | ||
* @public | ||
*/ | ||
Configuration.prototype.getKeys = function(path) { | ||
var obj = get(this, path); | ||
/** | ||
* @param {String} path | ||
* @returns {Boolean} | ||
* @see has | ||
* @public | ||
*/ | ||
has(path) { | ||
return has(this._rawConfig, path); | ||
} | ||
if (typeOf(obj) !== 'object') { | ||
return []; | ||
} else { | ||
return Object.keys(obj); | ||
/** | ||
* Shortcut for `Object.keys(c.get('some.obj.prop', {}))` | ||
* @param {String} [path] | ||
* @returns {String[]} keys of object property by path or | ||
* empty array if property doesn't exists or not an object | ||
* @public | ||
*/ | ||
getKeys(path) { | ||
const obj = get(this._rawConfig, path); | ||
if (typeOf(obj) !== 'object') { | ||
return []; | ||
} else { | ||
return Object.keys(obj); | ||
} | ||
} | ||
}; | ||
/** | ||
* Shortcut for `path.resolve(process.cwd(), c.get(path, 'default.file'))` | ||
* @param {String} propPath | ||
* @param {String} [defaultPath] | ||
* @returns {String} absolute path | ||
* @public | ||
*/ | ||
Configuration.prototype.resolve = function(propPath, defaultPath) { | ||
return path.resolve( | ||
this._resolveBaseDir, | ||
get(this, propPath, defaultPath)); | ||
}; | ||
/** | ||
* Shortcut for `path.resolve(process.cwd(), c.get(path, 'default.file'))` | ||
* @param {String} propPath | ||
* @param {String} [defaultPath] | ||
* @returns {String} absolute path | ||
* @public | ||
*/ | ||
resolve(propPath, defaultPath) { | ||
return path.resolve( | ||
this._resolveBaseDir, | ||
get(this._rawConfig, propPath, defaultPath)); | ||
} | ||
/** | ||
* @returns {String} base dir used in `resolve()` | ||
* @public | ||
*/ | ||
Configuration.prototype.getBaseDir = function() { | ||
return this._resolveBaseDir; | ||
}; | ||
/** | ||
* @returns {String} base dir used in `resolve()` | ||
* @public | ||
*/ | ||
getBaseDir() { | ||
return this._resolveBaseDir; | ||
} | ||
/** | ||
* Create Configuration instance from plain object | ||
* @param {Object|*} config | ||
* @param {String} basedir - base dir for `resolve` method | ||
* @returns {Configuration|*} Configuration instance if `config` is object or `config` itself in other case | ||
* @public | ||
* @static | ||
*/ | ||
Configuration.extend = function(config, basedir) { | ||
return typeOf(config) === 'object' ? | ||
new Configuration(config, basedir) : | ||
config; | ||
}; | ||
/** | ||
* Create Configuration instance from plain object | ||
* @param {Object|*} config | ||
* @param {String} basedir - base dir for `resolve` method | ||
* @returns {Configuration|*} Configuration instance if `config` is object or `config` itself in other case | ||
* @public | ||
* @static | ||
*/ | ||
static extend(config, basedir) { | ||
return typeOf(config) === 'object' ? | ||
new Configuration(config, basedir) : | ||
config; | ||
} | ||
/** | ||
* Override config properties using `LUSTER_CONF` environment variable. | ||
* | ||
* @description | ||
* LUSTER_CONF='PATH=VALUE;...' | ||
* | ||
* ; – properties separator; | ||
* = – property and value separator; | ||
* PATH – property path; | ||
* VALUE – property value, JSON.parse applied to it, | ||
* if JSON.parse failed, then value used as string. | ||
* You MUST quote a string if it contains semicolon. | ||
* | ||
* Spaces between PATH, "=", ";" and VALUE are insignificant. | ||
* | ||
* @example | ||
* LUSTER_CONF='server.port=8080' | ||
* # { server: { port: 8080 } } | ||
* | ||
* LUSTER_CONF='app=./worker_debug.js; workers=1' | ||
* # { app: "./worker_debug.js", workers : 1 } | ||
* | ||
* LUSTER_CONF='logStream=' | ||
* # remove option "logStream" | ||
* | ||
* LUSTER_CONF='server={"port":8080}' | ||
* # { server: { port: 8080 } } | ||
* | ||
* @param {Object} config | ||
* @throws {LusterConfigurationError} if you trying to | ||
* set property of atomic property, for example, | ||
* error will be thrown if you have property | ||
* `extensions.sample.x = 10` in the configuration and | ||
* environment variable `LUSTER_EXTENSIONS_SAMPLE_X_Y=5` | ||
*/ | ||
Configuration.applyEnvironment = function(config) { | ||
if ( ! process.env.LUSTER_CONF) { | ||
return; | ||
extend(config) { | ||
return new Configuration( | ||
{ | ||
...this.raw, | ||
...config, | ||
}, | ||
this._resolveBaseDir, | ||
); | ||
} | ||
function parseProp(prop) { | ||
var delimeterPos = prop.indexOf('='), | ||
propPath, | ||
propValue; | ||
if (delimeterPos === 0 || delimeterPos === -1) { | ||
/** | ||
* Override config properties using `LUSTER_CONF` environment variable. | ||
* | ||
* @description | ||
* LUSTER_CONF='PATH=VALUE;...' | ||
* | ||
* ; – properties separator; | ||
* = – property and value separator; | ||
* PATH – property path; | ||
* VALUE – property value, JSON.parse applied to it, | ||
* if JSON.parse failed, then value used as string. | ||
* You MUST quote a string if it contains semicolon. | ||
* | ||
* Spaces between PATH, "=", ";" and VALUE are insignificant. | ||
* | ||
* @example | ||
* LUSTER_CONF='server.port=8080' | ||
* # { server: { port: 8080 } } | ||
* | ||
* LUSTER_CONF='app=./worker_debug.js; workers=1' | ||
* # { app: "./worker_debug.js", workers : 1 } | ||
* | ||
* LUSTER_CONF='logStream=' | ||
* # remove option "logStream" | ||
* | ||
* LUSTER_CONF='server={"port":8080}' | ||
* # { server: { port: 8080 } } | ||
* | ||
* @param {Object} config | ||
* @throws {LusterConfigurationError} if you trying to | ||
* set property of atomic property, for example, | ||
* error will be thrown if you have property | ||
* `extensions.sample.x = 10` in the configuration and | ||
* environment variable `LUSTER_EXTENSIONS_SAMPLE_X_Y=5` | ||
*/ | ||
static applyEnvironment(config) { | ||
if (!process.env.LUSTER_CONF) { | ||
return; | ||
} | ||
propPath = prop.substr(0, delimeterPos).trim(); | ||
propValue = prop.substr(delimeterPos + 1).trim(); | ||
if (config instanceof Configuration) { | ||
config = config._rawConfig; | ||
} | ||
if (propValue === '') { | ||
propValue = undefined; | ||
} else { | ||
try { | ||
// try to parse propValue as JSON, | ||
// if parsing failed use raw `propValue` as string | ||
propValue = JSON.parse(propValue); | ||
} catch(error) { // eslint-disable-line no-empty | ||
function parseProp(prop) { | ||
const delimeterPos = prop.indexOf('='); | ||
if (delimeterPos === 0 || delimeterPos === -1) { | ||
return; | ||
} | ||
const propPath = prop.substr(0, delimeterPos).trim(); | ||
let propValue = prop.substr(delimeterPos + 1).trim(); | ||
if (propValue === '') { | ||
propValue = undefined; | ||
} else { | ||
try { | ||
// try to parse propValue as JSON, | ||
// if parsing failed use raw `propValue` as string | ||
propValue = JSON.parse(propValue); | ||
} catch (error) { // eslint-disable-line no-empty | ||
} | ||
} | ||
set(config, propPath, propValue); | ||
} | ||
set(config, propPath, propValue); | ||
} | ||
const conf = process.env.LUSTER_CONF; | ||
var conf = process.env.LUSTER_CONF, | ||
lastSeparator = -1, | ||
i = 0, | ||
openQuote = false; | ||
let lastSeparator = -1, | ||
i = 0, | ||
openQuote = false; | ||
while (conf.length > i++) { | ||
switch (conf[i]) { | ||
case '"' : | ||
openQuote = ! openQuote; | ||
break; | ||
case ';' : | ||
if ( ! openQuote) { | ||
parseProp(conf.substring(lastSeparator + 1, i)); | ||
lastSeparator = i; | ||
while (conf.length > i++) { | ||
switch (conf[i]) { | ||
case '"' : | ||
openQuote = !openQuote; | ||
break; | ||
case ';' : | ||
if (!openQuote) { | ||
parseProp(conf.substring(lastSeparator + 1, i)); | ||
lastSeparator = i; | ||
} | ||
} | ||
} | ||
} | ||
if (lastSeparator < conf.length) { | ||
parseProp(conf.substring(lastSeparator + 1)); | ||
if (lastSeparator < conf.length) { | ||
parseProp(conf.substring(lastSeparator + 1)); | ||
} | ||
} | ||
}; | ||
} | ||
@@ -185,0 +204,0 @@ Configuration.check = require('./check'); |
@@ -1,2 +0,2 @@ | ||
var Terror = require('terror'), | ||
const Terror = require('terror'), | ||
errors = {}; | ||
@@ -9,3 +9,3 @@ | ||
*/ | ||
var LusterError = errors.LusterError = Terror.create('LusterError', | ||
const LusterError = errors.LusterError = Terror.create('LusterError', | ||
{ | ||
@@ -98,2 +98,13 @@ ABSTRACT_METHOD_IS_NOT_IMPLEMENTED: | ||
/** | ||
* @constructor | ||
* @class LusterMasterError | ||
* @augments LusterError | ||
*/ | ||
errors.LusterMasterError = LusterError.create('LusterMasterError', | ||
{ | ||
POOL_KEY_ALREADY_TAKEN: | ||
'Pool key "%key%" is already taken' | ||
}); | ||
module.exports = errors; |
@@ -1,12 +0,10 @@ | ||
var Objex = require('objex'), | ||
util = require('util'), | ||
EventEmitter = require('events').EventEmitter, | ||
const util = require('util'), | ||
{ EventEmitter } = require('events').EventEmitter; | ||
/** | ||
* @constructor | ||
* @class EventEmitterEx | ||
* @augments EventEmitter | ||
* @augments Objex | ||
*/ | ||
EventEmitterEx = Objex.wrap(EventEmitter).create(); | ||
/** | ||
* @constructor | ||
* @class EventEmitterEx | ||
* @augments EventEmitter | ||
*/ | ||
class EventEmitterEx extends EventEmitter {} | ||
@@ -19,10 +17,10 @@ function inspect(val) { | ||
if (process.env.NODE_DEBUG && /luster:eex/i.test(process.env.NODE_DEBUG)) { | ||
EventEmitterEx.prototype.emit = function() { | ||
var args = Array.prototype | ||
.slice.call(arguments, 0) | ||
.map(inspect); | ||
EventEmitterEx.prototype.emit = function(...args) { | ||
const inspectedArgs = args.map(inspect).join(', '); | ||
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', this.wid, args.join(', ')); | ||
const key = this.eexKey; | ||
return EventEmitterEx.__super.prototype.emit.apply(this, arguments); | ||
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', key, inspectedArgs); | ||
return EventEmitter.prototype.emit.apply(this, args); | ||
}; | ||
@@ -29,0 +27,0 @@ } |
@@ -1,2 +0,2 @@ | ||
var cluster = require('cluster'), | ||
const cluster = require('cluster'), | ||
/** @type {ClusterProcess} */ | ||
@@ -3,0 +3,0 @@ Proc = require(cluster.isMaster ? './master' : './worker'); |
@@ -1,10 +0,11 @@ | ||
var os = require('os'), | ||
cluster = require('cluster'), | ||
extend = require('extend'), | ||
ClusterProcess = require('./cluster_process'), | ||
WorkerWrapper = require('./worker_wrapper'), | ||
Port = require('./port'), | ||
RestartQueue = require('./restart_queue'), | ||
Master; | ||
const cluster = require('cluster'); | ||
const ClusterProcess = require('./cluster_process'); | ||
const LusterMasterError = require('./errors').LusterMasterError; | ||
const RPC = require('./rpc'); | ||
const WorkerPool = require('./worker-pool'); | ||
const WorkerWrapper = require('./worker_wrapper'); | ||
const DEFAULT_POOL_KEY = '__default'; | ||
/** | ||
@@ -15,453 +16,363 @@ * @constructor | ||
*/ | ||
Master = ClusterProcess.create(function Master() { | ||
Master.__super.apply(this, arguments); | ||
class Master extends ClusterProcess { | ||
constructor() { | ||
super(); | ||
/** | ||
* Configuration object to pass to cluster.setupMaster() | ||
* @type {Object} | ||
* @private | ||
*/ | ||
this._masterOpts = {}; | ||
this.pools = new Map(); | ||
this.createPool(DEFAULT_POOL_KEY); | ||
this.id = 0; | ||
this.wid = 0; | ||
this.eexKey = 0; | ||
this.pid = process.pid; | ||
// @todo make it optional? | ||
process.on('SIGINT', this._onSignalQuit.bind(this)); | ||
process.on('SIGQUIT', this._onSignalQuit.bind(this)); | ||
} | ||
createPool(key) { | ||
if (this.pools.has(key)) { | ||
throw LusterMasterError.createError( | ||
LusterMasterError.CODES.POOL_KEY_ALREADY_TAKEN, | ||
{key} | ||
); | ||
} | ||
this.emit('create pool', key); | ||
const pool = new WorkerPool(key, this); | ||
this._proxyWorkerEvents(pool); | ||
pool.on('shutdown', this._checkPoolsAlive.bind(this)); | ||
this.pools.set(key, pool); | ||
return pool; | ||
} | ||
getPool(key) { | ||
return this.pools.get(key); | ||
} | ||
/** | ||
* @type {Object} | ||
* @property {WorkerWrapper} * | ||
* @public | ||
* @todo make it private or public immutable | ||
* Allows same object structure as cluster.setupMaster(). | ||
* This function must be used instead of cluster.setupMaster(), | ||
* because all calls of cluster.setupMaster() ignored, except first one. | ||
* An instance of Master will call it, when running. | ||
* @param {Object} opts | ||
* @see {@link http://nodejs.org/api/cluster.html#cluster_cluster_setupmaster_settings} | ||
*/ | ||
this.workers = {}; | ||
setup(opts) { | ||
Object.assign(this._masterOpts, opts); | ||
} | ||
/** | ||
* Workers restart queue. | ||
* @type {RestartQueue} | ||
* SIGINT and SIGQUIT handler | ||
* @private | ||
*/ | ||
this._restartQueue = new RestartQueue(); | ||
_onSignalQuit() { | ||
this | ||
.once('shutdown', () => process.exit(0)) | ||
.shutdown(); | ||
} | ||
/** | ||
* Configuration object to pass to cluster.setupMaster() | ||
* @type {Object} | ||
* Check for alive workers, if no one here, then emit "shutdown". | ||
* @private | ||
*/ | ||
this._masterOpts = {}; | ||
_checkPoolsAlive() { | ||
let dead = true; | ||
this.forEachPool(pool => dead = dead && pool.dead); | ||
this.id = 0; | ||
this.wid = 0; | ||
this.pid = process.pid; | ||
if (dead) { | ||
this.emit('shutdown'); | ||
} | ||
} | ||
this.on('worker state', this._cleanupUnixSockets.bind(this)); | ||
this.on('worker exit', this._checkWorkersAlive.bind(this)); | ||
/** | ||
* Repeat WorkerWrapper events from WorkerPool on Master | ||
* so for example 'online' became 'worker online' | ||
* @private | ||
* @param {WorkerPool} pool | ||
*/ | ||
_proxyWorkerEvents(pool) { | ||
for (const eventName of WorkerWrapper.EVENTS) { | ||
const proxyEventName = 'worker ' + eventName; | ||
pool.on(proxyEventName, this.emit.bind(this, proxyEventName)); | ||
} | ||
} | ||
// @todo make it optional? | ||
process.on('SIGINT', this._onSignalQuit.bind(this)); | ||
process.on('SIGQUIT', this._onSignalQuit.bind(this)); | ||
}); | ||
/** | ||
* @returns {number[]} workers ids array | ||
*/ | ||
getWorkersIds() { | ||
return this.getWorkersArray().map(w => w.wid); | ||
} | ||
/** | ||
* Allows same object structure as cluster.setupMaster(). | ||
* This function must be used instead of cluster.setupMaster(), | ||
* because all calls of cluster.setupMaster() ignored, except first one. | ||
* An instance of Master will call it, when running. | ||
* @param {Object} opts | ||
* @see {@link http://nodejs.org/api/cluster.html#cluster_cluster_setupmaster_settings} | ||
*/ | ||
Master.prototype.setup = function(opts) { | ||
extend(this._masterOpts, opts); | ||
}; | ||
/** | ||
* @returns {WorkerWrapper[]} workers array | ||
*/ | ||
getWorkersArray() { | ||
let result = []; | ||
this.forEachPool( | ||
pool => result = result.concat(pool.getWorkersArray()) | ||
); | ||
return result; | ||
} | ||
/** | ||
* SIGINT and SIGQUIT handler | ||
* @private | ||
*/ | ||
Master.prototype._onSignalQuit = function() { | ||
this | ||
.once('shutdown', function() { | ||
process.exit(0); | ||
}) | ||
.shutdown(); | ||
}; | ||
/** | ||
* Remove not used unix socket before worker will try to listen it. | ||
* @param {WorkerWrapper} worker | ||
* @param {WorkerWrapperState} state | ||
* @private | ||
*/ | ||
Master.prototype._cleanupUnixSockets = function(worker, state) { | ||
var port = worker.options.port; | ||
if (this._restartQueue.has(worker) || | ||
state !== WorkerWrapper.STATES.LAUNCHING || | ||
port.family !== Port.UNIX) { | ||
return; | ||
forEachPool(fn) { | ||
for (const pool of this.pools.values()) { | ||
fn(pool); | ||
} | ||
} | ||
var self = this, | ||
inUse = this.getWorkersArray().some(function(w) { | ||
return worker.wid !== w.wid && | ||
w.isRunning() && | ||
port.isEqualTo(w.options.port); | ||
/** | ||
* Iterate over workers in the pool. | ||
* @param {Function} fn | ||
* @public | ||
* @returns {Master} self | ||
* | ||
* @description Shortcut for: | ||
* master.getWorkersArray().forEach(fn); | ||
*/ | ||
forEach(fn) { | ||
this.forEachPool(pool => { | ||
pool.forEach(fn); | ||
}); | ||
if ( ! inUse) { | ||
port.unlink(function(err) { | ||
if (err) { | ||
self.emit('error', err); | ||
} | ||
}); | ||
return this; | ||
} | ||
}; | ||
/** | ||
* Check for alive workers, if no one here, then emit "shutdown". | ||
* @private | ||
*/ | ||
Master.prototype._checkWorkersAlive = function() { | ||
var workers = this.getWorkersArray(), | ||
alive = workers.reduce(function(count, w) { | ||
return w.dead ? count - 1 : count; | ||
}, workers.length); | ||
if (alive === 0) { | ||
this.emit('shutdown'); | ||
/** | ||
* Broadcast an event received by IPC from worker as 'worker <event>'. | ||
* @param {WorkerWrapper} worker | ||
* @param {String} event | ||
* @param {...*} args | ||
*/ | ||
broadcastWorkerEvent(worker, event, ...args) { | ||
this.emit('received worker ' + event, worker, ...args); | ||
} | ||
}; | ||
/** | ||
* Repeat WorkerWrapper events on Master and add 'worker ' prefix to event names | ||
* so for example 'online' became 'worker online' | ||
* @private | ||
* @param {WorkerWrapper} worker | ||
*/ | ||
Master.prototype._proxyWorkerEvents = function(worker) { | ||
WorkerWrapper.EVENTS | ||
.forEach(function(eventName) { | ||
worker.on(eventName, this.emit.bind(this, 'worker ' + eventName, worker)); | ||
}, this); | ||
}; | ||
/** | ||
* Configure cluster | ||
* @override ClusterProcess | ||
* @private | ||
*/ | ||
_onConfigured() { | ||
super._onConfigured(); | ||
/** | ||
* @returns {WorkerWrapper[]} workers array | ||
*/ | ||
Master.prototype.getWorkersArray = function() { | ||
if ( ! this._workersArrayCache) { | ||
this._workersArrayCache = Object.keys(this.workers).map(function(wid) { | ||
return this.workers[wid]; | ||
}, this); | ||
// register global remote command in the context of master to receive events from master | ||
if (!this.hasRegisteredRemoteCommand(RPC.fns.master.broadcastWorkerEvent)) { | ||
this.registerRemoteCommand( | ||
RPC.fns.master.broadcastWorkerEvent, | ||
this.broadcastWorkerEvent.bind(this) | ||
); | ||
} | ||
this.pools.get(DEFAULT_POOL_KEY).configure(this.config); | ||
} | ||
return this._workersArrayCache; | ||
}; | ||
/** | ||
* @param {Number[]} wids Array of `WorkerWrapper#wid` values | ||
* @param {String} event wait for | ||
* @public | ||
* @returns {Promise<void>} | ||
*/ | ||
waitForWorkers(wids, event) { | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool.waitForWorkers(wids, event)) | ||
); | ||
/** | ||
* Add worker to the pool | ||
* @param {WorkerWrapper} worker | ||
* @returns {Master} self | ||
* @public | ||
*/ | ||
Master.prototype.add = function(worker) { | ||
// invalidate Master#getWorkersArray cache | ||
this._workersArrayCache = null; | ||
return Promise.all(promises); | ||
} | ||
this.workers[worker.wid] = worker; | ||
this._proxyWorkerEvents(worker); | ||
/** | ||
* @param {String} event wait for | ||
* @public | ||
* @returns {Promise<void>} | ||
*/ | ||
waitForAllWorkers(event) { | ||
return this.waitForWorkers( | ||
this.getWorkersIds(), | ||
event | ||
); | ||
} | ||
return this; | ||
}; | ||
/** | ||
* @event Master#running | ||
*/ | ||
/** | ||
* Iterate over workers in the pool. | ||
* @param {Function} fn | ||
* @public | ||
* @returns {Master} self | ||
* | ||
* @description Shortcut for: | ||
* master.getWorkersArray().forEach(function(worker) { | ||
* // fn | ||
* }, master); | ||
*/ | ||
Master.prototype.forEach = function(fn) { | ||
this.getWorkersArray().forEach(function(worker) { | ||
fn.call(this, worker); | ||
}, this); | ||
/** | ||
* @event Master#restarted | ||
*/ | ||
return this; | ||
}; | ||
async _restart() { | ||
// TODO maybe run this after starting waitForAllWorkers | ||
this.forEachPool(pool => pool.restart()); | ||
/** | ||
* Configure cluster | ||
* @override ClusterProcess | ||
* @private | ||
*/ | ||
Master.prototype._onConfigured = function() { | ||
Master.__super.prototype._onConfigured.apply(this, arguments); | ||
await this.waitForAllWorkers('worker ready'); | ||
var // WorkerWrapper options | ||
forkTimeout = this.config.get('control.forkTimeout'), | ||
stopTimeout = this.config.get('control.stopTimeout'), | ||
exitThreshold = this.config.get('control.exitThreshold'), | ||
allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'), | ||
this.emit('restarted'); | ||
} | ||
// workers and groups count | ||
i = 0, | ||
count = this.config.get('workers', os.cpus().length), | ||
isServerPortSet = this.config.has('server.port'), | ||
port, | ||
groups = this.config.get('server.groups', 1), | ||
group = 0, | ||
workersPerGroup = Math.floor(count / groups), | ||
workersInGroup = 0; | ||
if (isServerPortSet) { | ||
port = new Port(this.config.get('server.port')); | ||
/** | ||
* Hard workers restart: all workers will be restarted at same time. | ||
* CAUTION: if dead worker is restarted, it will emit 'error' event. | ||
* @public | ||
* @returns {Master} self | ||
* @fires Master#restarted when workers spawned and ready. | ||
*/ | ||
restart() { | ||
this._restart(); | ||
return this; | ||
} | ||
// create pool of workers | ||
while (count > i++) { | ||
this.add(new WorkerWrapper(this, { | ||
forkTimeout: forkTimeout, | ||
stopTimeout: stopTimeout, | ||
exitThreshold: exitThreshold, | ||
allowedSequentialDeaths: allowedSequentialDeaths, | ||
port: isServerPortSet ? port.next(group) : 0, | ||
maxListeners: this.getMaxListeners(), | ||
})); | ||
async _softRestart() { | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool.softRestart()) | ||
); | ||
// groups > 1, current group is full and | ||
// last workers can form at least more one group | ||
if (groups > 1 && | ||
++workersInGroup >= workersPerGroup && | ||
count - (group + 1) * workersPerGroup >= workersPerGroup) { | ||
workersInGroup = 0; | ||
group++; | ||
} | ||
await Promise.all(promises); | ||
this.emit('restarted'); | ||
} | ||
}; | ||
/** | ||
* @param {Number[]} wids Array of `WorkerWrapper#wid` values | ||
* @param {String} event wait for | ||
* @param {Function} callback | ||
* @public | ||
* @returns {Master} self | ||
*/ | ||
Master.prototype.waitForWorkers = function(wids, event, callback) { | ||
var self = this; | ||
/** | ||
* Workers will be restarted one by one using RestartQueue. | ||
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed | ||
* into the queue, it will emit 'error' on restart. | ||
* @public | ||
* @returns {Master} self | ||
* @fires Master#restarted when workers spawned and ready. | ||
*/ | ||
softRestart() { | ||
this._softRestart(); | ||
return this; | ||
} | ||
function onWorkerState(worker) { | ||
var idx = wids.indexOf(worker.wid); | ||
/** | ||
* @override | ||
* @see ClusterProcess | ||
* @private | ||
*/ | ||
_setupIPCMessagesHandler() { | ||
this.on('worker message', this._onMessage.bind(this)); | ||
} | ||
if (idx > -1) { | ||
wids.splice(idx, 1); | ||
} | ||
/** | ||
* RPC to all workers | ||
* @method | ||
* @param {String} name of called command in the worker | ||
* @param {...*} args | ||
* @public | ||
*/ | ||
remoteCallToAll(name, ...args) { | ||
this.forEachPool(pool => pool.remoteCallToAll(name, ...args)); | ||
} | ||
if (wids.length === 0) { | ||
self.removeListener(event, onWorkerState); | ||
callback.call(self); | ||
} | ||
/** | ||
* Broadcast event to all workers. | ||
* @method | ||
* @param {String} event of called command in the worker | ||
* @param {...*} args | ||
* @public | ||
*/ | ||
broadcastEventToAll(event, ...args) { | ||
this.forEachPool(pool => pool.broadcastEventToAll(event, ...args)); | ||
} | ||
if (wids.length > 0) { | ||
this.on(event, onWorkerState); | ||
} else { | ||
setImmediate(callback.bind(self)); | ||
/** | ||
* Emit event on master and all workers in "ready" state. | ||
* @method | ||
* @param {String} event of called command in the worker | ||
* @param {...*} args | ||
* @public | ||
*/ | ||
emitToAll(event, ...args) { | ||
this.forEachPool(pool => pool.emitToAll(event, ...args)); | ||
} | ||
return this; | ||
}; | ||
/** | ||
* @event Master#shutdown | ||
*/ | ||
/** | ||
* @event Master#running | ||
*/ | ||
async _shutdown() { | ||
const promises = []; | ||
this.forEachPool( | ||
pool => promises.push(pool._shutdown()) | ||
); | ||
/** | ||
* Fork workers. | ||
* Execution will be delayed until Master became configured | ||
* (`configured` event fired). | ||
* @method | ||
* @returns {Master} self | ||
* @public | ||
* @fires Master#running then workers spawned and ready. | ||
* | ||
* @example | ||
* // file: master.js | ||
* var master = require('luster'); | ||
* | ||
* master | ||
* .configure({ app : 'worker' }) | ||
* .run(); | ||
* | ||
* // there is master is still not running anyway | ||
* // it will run immediate once configured and | ||
* // current thread execution done | ||
*/ | ||
Master.prototype.run = Master.whenInitialized(function() { | ||
cluster.setupMaster(this._masterOpts); | ||
await Promise.all(promises); | ||
this.waitForWorkers( | ||
this.getWorkersArray().map(function(worker) { | ||
worker.run(); | ||
this.emit('shutdown'); | ||
} | ||
return worker.wid; | ||
}), | ||
'worker ready', | ||
function() { | ||
this.emit('running'); | ||
}); | ||
/** | ||
* Stop all workers and emit `Master#shutdown` event after successful shutdown of all workers. | ||
* @fires Master#shutdown | ||
* @returns {Master} | ||
*/ | ||
shutdown() { | ||
this._shutdown(); | ||
return this; | ||
} | ||
return this; | ||
}); | ||
/** | ||
* Do a remote call to all workers, callbacks are registered and then executed separately for each worker | ||
* @method | ||
* @param {String} opts.command | ||
* @param {Function} opts.callback | ||
* @param {Number} [opts.timeout] in milliseconds | ||
* @param {*} [opts.data] | ||
* @public | ||
*/ | ||
remoteCallToAllWithCallback(opts) { | ||
this.forEachPool(pool => pool.remoteCallToAllWithCallback(opts)); | ||
} | ||
/** | ||
* @event Master#restarted | ||
*/ | ||
async _run() { | ||
await this.whenInitialized(); | ||
/** | ||
* Hard workers restart: all workers will be restarted at same time. | ||
* CAUTION: if dead worker is restarted, it will emit 'error' event. | ||
* @public | ||
* @returns {Master} self | ||
* @fires Master#restarted when workers spawned and ready. | ||
*/ | ||
Master.prototype.restart = function() { | ||
this.waitForWorkers( | ||
this.getWorkersArray().map(function(worker) { | ||
worker.restart(); | ||
cluster.setupMaster(this._masterOpts); | ||
return worker.wid; | ||
}), | ||
'worker ready', | ||
function() { | ||
this.emit('restarted'); | ||
}); | ||
// TODO maybe run this after starting waitForAllWorkers | ||
this.forEachPool(pool => pool.run()); | ||
return this; | ||
}; | ||
await this.waitForAllWorkers('worker ready'); | ||
/** | ||
* Workers will be restarted one by one using RestartQueue. | ||
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed | ||
* into the queue, it will emit 'error' on restart. | ||
* @public | ||
* @returns {Master} self | ||
* @fires Master#restarted when workers spawned and ready. | ||
*/ | ||
Master.prototype.softRestart = function() { | ||
this.forEach(function(worker) { | ||
worker.softRestart(); | ||
}); | ||
this._restartQueue.once('drain', this.emit.bind(this, 'restarted')); | ||
return this; | ||
}; | ||
this.emit('running'); | ||
} | ||
/** | ||
* Schedules one worker restart using RestartQueue. | ||
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed | ||
* into the queue, it will emit 'error' on restart. | ||
* @public | ||
* @param {WorkerWrapper} worker | ||
* @returns {Master} self | ||
*/ | ||
Master.prototype.scheduleWorkerRestart = function(worker) { | ||
this._restartQueue.push(worker); | ||
return this; | ||
}; | ||
/** | ||
* Fork workers. | ||
* Execution will be delayed until Master became configured | ||
* (`configured` event fired). | ||
* @method | ||
* @returns {Master} self | ||
* @public | ||
* @fires Master#running then workers spawned and ready. | ||
* | ||
* @example | ||
* // file: master.js | ||
* var master = require('luster'); | ||
* | ||
* master | ||
* .configure({ app : 'worker' }) | ||
* .run(); | ||
* | ||
* // there is master is still not running anyway | ||
* // it will run immediate once configured and | ||
* // current thread execution done | ||
*/ | ||
run() { | ||
this._run(); | ||
return this; | ||
} | ||
} | ||
/** | ||
* @override | ||
* @see ClusterProcess | ||
* @private | ||
*/ | ||
Master.prototype._setupIPCMessagesHandler = function() { | ||
this.on('worker message', this._onMessage.bind(this)); | ||
}; | ||
/** | ||
* RPC to all workers | ||
* @method | ||
* @param {String} name of called command in the worker | ||
* @param {*} ...args | ||
* @public | ||
*/ | ||
Master.prototype.remoteCallToAll = function(name) { // eslint-disable-line no-unused-vars | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
this.forEach(function(worker) { | ||
if (worker.ready) { | ||
worker.remoteCall.apply(worker, args); | ||
} else { | ||
worker.on('ready', function() { | ||
worker.remoteCall.apply(worker, args); | ||
}.bind(worker, args)); | ||
} | ||
}); | ||
}; | ||
/** | ||
* Broadcast event to all workers. | ||
* @method | ||
* @param {String} event of called command in the worker | ||
* @param {*} ...args | ||
* @public | ||
*/ | ||
Master.prototype.broadcastEventToAll = function() { | ||
var args = Array.prototype.slice.call(arguments, 0); | ||
this.forEach(function(worker) { | ||
if (worker.ready) { | ||
worker.broadcastEvent.apply(worker, args); | ||
} | ||
}); | ||
}; | ||
/** | ||
* Emit event on master and all workers in "ready" state. | ||
* @method | ||
* @param {String} event of called command in the worker | ||
* @param {*} ...args | ||
* @public | ||
*/ | ||
Master.prototype.emitToAll = function() { | ||
this.emit.apply(this, arguments); | ||
this.broadcastEventToAll.apply(this, arguments); | ||
}; | ||
/** | ||
* @event Master#shutdown | ||
*/ | ||
/** | ||
* Stop all workers and emit `Master#shutdown` event after successful shutdown of all workers. | ||
* @fires Master#shutdown | ||
* @returns {Master} | ||
*/ | ||
Master.prototype.shutdown = function() { | ||
var stoppedWorkers = []; | ||
this.forEach(function(worker) { | ||
if (worker.isRunning()) { | ||
worker.stop(); | ||
stoppedWorkers.push(worker.wid); | ||
} | ||
}); | ||
this.waitForWorkers( | ||
stoppedWorkers, | ||
'worker exit', | ||
function() { | ||
this.emit('shutdown'); | ||
}); | ||
return this; | ||
}; | ||
/** | ||
* Do a remote call to all workers, callbacks are registered and then executed separately for each worker | ||
* @method | ||
* @param {String} opts.command | ||
* @param {Function} opts.callback | ||
* @param {Number} [opts.timeout] in milliseconds | ||
* @param {*} [opts.data] | ||
* @public | ||
*/ | ||
Master.prototype.remoteCallToAllWithCallback = function(opts) { | ||
this.forEach(function(worker) { | ||
if (worker.isRunning()) { | ||
worker.remoteCallWithCallback(opts); | ||
} | ||
}); | ||
}; | ||
module.exports = Master; |
168
lib/port.js
@@ -1,2 +0,2 @@ | ||
var fs = require('fs'), | ||
const fs = require('fs'), | ||
LusterPortError = require('./errors').LusterPortError, | ||
@@ -19,4 +19,6 @@ UNIX_SOCKET_MASK = '*'; | ||
*/ | ||
function Port(value) { | ||
this.value = value; | ||
class Port { | ||
constructor(value) { | ||
this.value = value; | ||
} | ||
@@ -29,102 +31,100 @@ /** | ||
*/ | ||
Object.defineProperty(this, 'family', { | ||
get: function() { | ||
return isUnixSocket(this.value) ? Port.UNIX : Port.INET; | ||
} | ||
}); | ||
} | ||
/** | ||
* @property {String} UNIX | ||
* @memberOf {Port} | ||
* @readonly | ||
*/ | ||
/** | ||
* @property {String} INET | ||
* @memberOf {Port} | ||
* @readonly | ||
*/ | ||
['UNIX', 'INET'].forEach(function(family) { | ||
Object.defineProperty(Port, family, { | ||
value: family.toLowerCase(), | ||
enumerable: true | ||
}); | ||
}); | ||
/** | ||
* @param {*} port | ||
* @returns {Boolean} | ||
* @public | ||
*/ | ||
Port.prototype.isEqualTo = function(port) { | ||
if ( ! (port instanceof Port)) { | ||
return false; | ||
get family() { | ||
return isUnixSocket(this.value) ? Port.UNIX : Port.INET; | ||
} | ||
return this.value === port.value; | ||
}; | ||
/** | ||
* @param {*} port | ||
* @returns {Boolean} | ||
* @public | ||
*/ | ||
isEqualTo(port) { | ||
if (!(port instanceof Port)) { | ||
return false; | ||
} | ||
/** | ||
* @param {Number|String} [it=1] | ||
* @returns {Port} | ||
* @public | ||
*/ | ||
Port.prototype.next = function(it) { | ||
if (typeof it === 'undefined') { | ||
it = 1; | ||
return this.value === port.value; | ||
} | ||
var newVal = this.family === Port.UNIX ? | ||
this.value.replace(UNIX_SOCKET_MASK, it.toString()) : | ||
Number(this.value) + it; | ||
/** | ||
* @param {Number|String} [it=1] | ||
* @returns {Port} | ||
* @public | ||
*/ | ||
next(it) { | ||
if (typeof it === 'undefined') { | ||
it = 1; | ||
} | ||
return new Port(newVal); | ||
}; | ||
const newVal = this.family === Port.UNIX ? | ||
this.value.replace(UNIX_SOCKET_MASK, it.toString()) : | ||
Number(this.value) + it; | ||
/** | ||
* @param {Error} [err] | ||
* @param {Function} cb | ||
*/ | ||
Port.prototype.unlink = function(err, cb) { | ||
if ( ! cb && typeof err === 'function') { | ||
cb = err; | ||
err = undefined; | ||
return new Port(newVal); | ||
} | ||
if (err) { | ||
cb(LusterPortError | ||
.createError(LusterPortError.CODES.UNKNOWN_ERROR, err)); | ||
return; | ||
} | ||
/** | ||
* @param {Error} [err] | ||
* @param {Function} cb | ||
*/ | ||
unlink(err, cb) { | ||
if (!cb && typeof err === 'function') { | ||
cb = err; | ||
err = undefined; | ||
} | ||
var value = this.value; | ||
if (err) { | ||
cb(LusterPortError | ||
.createError(LusterPortError.CODES.UNKNOWN_ERROR, err)); | ||
return; | ||
} | ||
if (this.family !== Port.UNIX) { | ||
cb(LusterPortError | ||
.createError(LusterPortError.CODES.NOT_UNIX_SOCKET) | ||
.bind({ value: value })); | ||
return; | ||
} | ||
const value = this.value; | ||
fs.unlink(value, function(err) { | ||
if (err && err.code !== 'ENOENT') { | ||
if (this.family !== Port.UNIX) { | ||
cb(LusterPortError | ||
.createError(LusterPortError.CODES.CAN_NOT_UNLINK_UNIX_SOCKET, err) | ||
.bind({ socketPath: value })); | ||
.createError(LusterPortError.CODES.NOT_UNIX_SOCKET) | ||
.bind({value})); | ||
return; | ||
} | ||
cb(); | ||
}); | ||
}; | ||
fs.unlink(value, err => { | ||
if (err && err.code !== 'ENOENT') { | ||
cb(LusterPortError | ||
.createError(LusterPortError.CODES.CAN_NOT_UNLINK_UNIX_SOCKET, err) | ||
.bind({socketPath: value})); | ||
return; | ||
} | ||
Port.prototype.toString = function() { | ||
return this.value; | ||
}; | ||
cb(); | ||
}); | ||
} | ||
Port.prototype.valueOf = function() { | ||
return this.value; | ||
}; | ||
toString() { | ||
return this.value; | ||
} | ||
valueOf() { | ||
return this.value; | ||
} | ||
/** | ||
* @property {String} UNIX | ||
* @memberOf {Port} | ||
* @readonly | ||
*/ | ||
static get UNIX() { | ||
return 'unix'; | ||
} | ||
/** | ||
* @property {String} INET | ||
* @memberOf {Port} | ||
* @readonly | ||
*/ | ||
static get INET() { | ||
return 'inet'; | ||
} | ||
} | ||
module.exports = Port; |
@@ -1,4 +0,3 @@ | ||
var EventEmitterEx = require('./event_emitter_ex'), | ||
WorkerWrapper = require('./worker_wrapper'), | ||
RestartQueue; | ||
const EventEmitterEx = require('./event_emitter_ex'), | ||
WorkerWrapper = require('./worker_wrapper'); | ||
@@ -18,76 +17,82 @@ /** | ||
*/ | ||
RestartQueue = EventEmitterEx.create(function RestartQueue() { | ||
/** | ||
* @type {WorkerWrapper[]} | ||
* @private | ||
*/ | ||
this._queue = []; | ||
}); | ||
class RestartQueue extends EventEmitterEx { | ||
constructor(eexKey = undefined) { | ||
super(); | ||
/** | ||
* Adds new worker in restart queue. Does nothing if worker is already in queue. | ||
* @public | ||
* @param {WorkerWrapper} worker | ||
*/ | ||
RestartQueue.prototype.push = function(worker) { | ||
if (this.has(worker)) { | ||
// Worker is already in queue, do nothing | ||
return; | ||
/** | ||
* @type {WorkerWrapper[]} | ||
* @private | ||
*/ | ||
this._queue = []; | ||
this.eexKey = eexKey; | ||
} | ||
var removeWorker = function() { | ||
worker.removeListener('ready', removeWorker); | ||
worker.removeListener('state', checkDead); | ||
this._remove(worker); | ||
}.bind(this); | ||
var checkDead = function(state) { | ||
if (state === WorkerWrapper.STATES.STOPPED && worker.dead) { | ||
removeWorker(); | ||
/** | ||
* Adds new worker in restart queue. Does nothing if worker is already in queue. | ||
* @public | ||
* @param {WorkerWrapper} worker | ||
*/ | ||
push(worker) { | ||
if (this.has(worker)) { | ||
// Worker is already in queue, do nothing | ||
return; | ||
} | ||
}; | ||
worker.on('ready', removeWorker); | ||
worker.on('state', checkDead); | ||
this._queue.push(worker); | ||
this._process(); | ||
}; | ||
const removeWorker = () => { | ||
worker.removeListener('ready', removeWorker); | ||
worker.removeListener('state', checkDead); | ||
this._remove(worker); | ||
}; | ||
/** | ||
* Returns true if specified worker is in this queue. | ||
* @public | ||
* @param {WorkerWrapper} worker | ||
* @returns {Boolean} | ||
*/ | ||
RestartQueue.prototype.has = function(worker) { | ||
return this._queue.indexOf(worker) !== -1; | ||
}; | ||
const checkDead = state => { | ||
if (state === WorkerWrapper.STATES.STOPPED && worker.dead) { | ||
removeWorker(); | ||
} | ||
}; | ||
/** | ||
* Removes specified worker from queue | ||
* @private | ||
* @param {WorkerWrapper} worker | ||
*/ | ||
RestartQueue.prototype._remove = function(worker) { | ||
var idx = this._queue.indexOf(worker); | ||
this._queue.splice(idx, 1); | ||
this._process(); | ||
}; | ||
worker.on('ready', removeWorker); | ||
worker.on('state', checkDead); | ||
this._queue.push(worker); | ||
this._process(); | ||
} | ||
/** | ||
* Checks if any processing is needed. Should be called after each restart queue state change. | ||
* @private | ||
*/ | ||
RestartQueue.prototype._process = function() { | ||
if (this._queue.length === 0) { | ||
this.emit('drain'); | ||
return; | ||
/** | ||
* Returns true if specified worker is in this queue. | ||
* @public | ||
* @param {WorkerWrapper} worker | ||
* @returns {Boolean} | ||
*/ | ||
has(worker) { | ||
return this._queue.indexOf(worker) !== -1; | ||
} | ||
var head = this._queue[0]; | ||
if (head.ready) { | ||
head.restart(); | ||
/** | ||
* Removes specified worker from queue | ||
* @private | ||
* @param {WorkerWrapper} worker | ||
*/ | ||
_remove(worker) { | ||
const idx = this._queue.indexOf(worker); | ||
this._queue.splice(idx, 1); | ||
this._process(); | ||
} | ||
}; | ||
/** | ||
* Checks if any processing is needed. Should be called after each restart queue state change. | ||
* @private | ||
*/ | ||
_process() { | ||
if (this._queue.length === 0) { | ||
this.emit('drain'); | ||
return; | ||
} | ||
const head = this._queue[0]; | ||
if (head.ready) { | ||
head.restart(); | ||
} | ||
} | ||
} | ||
module.exports = RestartQueue; |
@@ -1,4 +0,4 @@ | ||
var LusterRPCCallbackError = require('./errors').LusterRPCCallbackError; | ||
const LusterRPCCallbackError = require('./errors').LusterRPCCallbackError; | ||
var RPCCallback = { | ||
const RPCCallback = { | ||
_storage: {}, | ||
@@ -15,5 +15,4 @@ | ||
*/ | ||
setCallback: function(proc, command, callback, timeout) { | ||
var self = this, | ||
storage = self._storage; | ||
setCallback(proc, command, callback, timeout) { | ||
const storage = this._storage; | ||
@@ -24,8 +23,8 @@ if ( ! timeout) { | ||
var callbackId = proc.wid + '_' + self._counter++; | ||
const callbackId = proc.wid + '_' + this._counter++; | ||
storage[callbackId] = { | ||
callback: callback, | ||
callback, | ||
timeout: | ||
setTimeout(function() { | ||
setTimeout(() => { | ||
storage[callbackId].callback( | ||
@@ -35,4 +34,4 @@ proc, | ||
LusterRPCCallbackError.CODES.REMOTE_CALL_WITH_CALLBACK_TIMEOUT, | ||
{ command: command })); | ||
self.removeCallback(callbackId); | ||
{ command })); | ||
this.removeCallback(callbackId); | ||
}, timeout) | ||
@@ -49,4 +48,4 @@ }; | ||
*/ | ||
processCallback: function(proc, callbackId, data) { | ||
var stored = this._storage[callbackId]; | ||
processCallback(proc, callbackId, data) { | ||
const stored = this._storage[callbackId]; | ||
@@ -57,5 +56,3 @@ if ( ! stored) { | ||
setImmediate(function() { | ||
stored.callback(proc, null, data); | ||
}); | ||
setImmediate(() => stored.callback(proc, null, data)); | ||
this.removeCallback(callbackId); | ||
@@ -67,4 +64,4 @@ }, | ||
*/ | ||
removeCallback: function(callbackId) { | ||
var timeout = this._storage[callbackId].timeout; | ||
removeCallback(callbackId) { | ||
const timeout = this._storage[callbackId].timeout; | ||
@@ -71,0 +68,0 @@ clearTimeout(timeout); |
/** | ||
* @type {{createCaller: Function, parseMessage: Function}} | ||
*/ | ||
var RPC = { | ||
const RPC = { | ||
/** | ||
@@ -9,8 +9,8 @@ * @param {Object} target must have `send` method | ||
*/ | ||
createCaller: function(target) { | ||
return function(name) { | ||
var message = { cmd: 'luster_' + name }; | ||
createCaller(target) { | ||
return function(name, ...args) { | ||
const message = { cmd: 'luster_' + name }; | ||
if (arguments.length > 1) { | ||
message.args = Array.prototype.slice.call(arguments, 1); | ||
if (args.length > 0) { | ||
message.args = args; | ||
} | ||
@@ -32,3 +32,3 @@ | ||
*/ | ||
parseMessage: function(message) { | ||
parseMessage(message) { | ||
if (message && | ||
@@ -35,0 +35,0 @@ typeof message.cmd === 'string' && |
@@ -1,2 +0,2 @@ | ||
var cluster = require('cluster'), | ||
const cluster = require('cluster'), | ||
RPC = require('./rpc'), | ||
@@ -6,22 +6,14 @@ RPCCallback = require('./rpc-callback'), | ||
Port = require('./port'), | ||
LusterWorkerWrapperError = require('./errors').LusterWorkerWrapperError, | ||
WorkerWrapper, | ||
LusterWorkerWrapperError = require('./errors').LusterWorkerWrapperError; | ||
/** | ||
* Identifier for next constructed WorkerWrapper. | ||
* Usage restricted to WorkerWrapper constructor only. | ||
* @type {Number} | ||
* @private | ||
*/ | ||
nextId = 0; | ||
/** | ||
* @event WorkerWrapper#state | ||
* @param {WorkerWrapperState} state Actual WorkerWrapper state | ||
* @see WorkerWrapper.STATES for possible `state` values. | ||
* Identifier for next constructed WorkerWrapper. | ||
* Usage restricted to WorkerWrapper constructor only. | ||
* @type {Number} | ||
* @private | ||
*/ | ||
let nextId = 0; | ||
/** | ||
* @memberOf WorkerWrapper | ||
* @typedef WorkerWrapperOptions | ||
* @class WorkerWrapperOptions | ||
* @property {Number|String} port Port number or socket path which worker can listen | ||
@@ -39,8 +31,43 @@ * @property {Boolean} [persistent=true] While `persistent === true` worker will be restarted on exit | ||
*/ | ||
class WorkerWrapperOptions { | ||
constructor(options) { | ||
this.persistent = typeof options.persistent === 'undefined' ? true : options.persistent; | ||
this.forkTimeout = options.forkTimeout; | ||
this.stopTimeout = options.stopTimeout; | ||
this.exitThreshold = options.exitThreshold; | ||
this.allowedSequentialDeaths = options.allowedSequentialDeaths || 0; | ||
this.port = options.port; | ||
this.workerEnv = options.workerEnv; | ||
} | ||
get port() { | ||
return this._port; | ||
} | ||
/** | ||
* Setter of `this.options.port` affects value of the `isListeningUnixSocket` property. | ||
* @memberOf WorkerWrapperOptions | ||
* @property {Number|String} port | ||
* @public | ||
*/ | ||
set port(value) { | ||
if (!(value instanceof Port)) { | ||
value = new Port(value); | ||
} | ||
return this._port = value; | ||
} | ||
} | ||
/** | ||
* @event WorkerWrapper#state | ||
* @param {WorkerWrapperState} state Actual WorkerWrapper state | ||
* @see WorkerWrapper.STATES for possible `state` values. | ||
*/ | ||
/** | ||
* @constructor | ||
* @class WorkerWrapper | ||
* @augments EventEmitterEx | ||
* @param {Master} master | ||
* @param {WorkerPool} pool | ||
* @param {WorkerWrapperOptions} options | ||
@@ -53,12 +80,13 @@ * | ||
*/ | ||
WorkerWrapper = EventEmitterEx.create(function WorkerWrapper(master, options) { | ||
WorkerWrapper.__super.call(this); | ||
class WorkerWrapper extends EventEmitterEx { | ||
constructor(pool, options) { | ||
super(); | ||
if (options && | ||
typeof options.maxListeners !== 'undefined' && | ||
options.maxListeners > this.getMaxListeners()) { | ||
this.setMaxListeners(options.maxListeners); | ||
} | ||
if (options && | ||
typeof options.maxListeners !== 'undefined' && | ||
options.maxListeners > this.getMaxListeners()) { | ||
this.setMaxListeners(options.maxListeners); | ||
} | ||
var /** | ||
/** | ||
* WorkerWrapper state. Must be set via private method `WorkerWrapper#_setState(value)`, | ||
@@ -69,42 +97,84 @@ * not directly. Can be retrieved via `WorkerWrapper#state` getter. | ||
*/ | ||
_state = WorkerWrapper.STATES.STOPPED, | ||
this._state = WorkerWrapper.STATES.STOPPED; | ||
/** @type {Number|String} */ | ||
_port; | ||
/** | ||
* @type WorkerWrapperOptions | ||
* @public | ||
* @readonly | ||
*/ | ||
this.options = new WorkerWrapperOptions(options); | ||
/** | ||
* @type WorkerWrapperOptions | ||
* @public | ||
* @readonly | ||
*/ | ||
this.options = { | ||
persistent: typeof options.persistent === 'undefined' ? true : options.persistent, | ||
forkTimeout: options.forkTimeout, | ||
stopTimeout: options.stopTimeout, | ||
exitThreshold: options.exitThreshold, | ||
allowedSequentialDeaths: options.allowedSequentialDeaths || 0 | ||
}; | ||
this._wid = ++nextId; | ||
this.eexKey = this._wid; | ||
/** | ||
* Setter of `this.options.port` affects value of the `isListeningUnixSocket` property. | ||
* @memberOf WorkerWrapper#options | ||
* @property {Number|String} port | ||
* @public | ||
*/ | ||
Object.defineProperty(this.options, 'port', { | ||
get: function() { | ||
return _port; | ||
}, | ||
set: function(value) { | ||
if ( ! (value instanceof Port)) { | ||
value = new Port(value); | ||
} | ||
/** | ||
* Indicates worker restarting in progress. | ||
* Changing `restarting` property value outside WorkerWrapper and it inheritors is not recommended. | ||
* @property {Boolean} restarting | ||
* @memberOf {WorkerWrapper} | ||
* @public | ||
*/ | ||
this.restarting = false; | ||
return _port = value; | ||
}, | ||
enumerable: true | ||
}); | ||
/** | ||
* Indicates worker stopping in progress. | ||
* Changing `stopping` property value outside WorkerWrapper and it inheritors is not recommended. | ||
* @property {Boolean} stopping | ||
* @memberOf {WorkerWrapper} | ||
* @public | ||
*/ | ||
this.stopping = false; | ||
this.options.port = options.port; | ||
/** | ||
* Worker can be marked as dead on sequential fails of launch attempt. | ||
* Dead worker will not be restarted on event WorkerWrapper#state('stopped'). | ||
* Internally in the WorkerWrapper worker can be marked as dead, but never go alive again. | ||
* To revive the worker something outside of the WorkerWrapper | ||
* must set the `dead` property value to `false`. | ||
* @public | ||
* @type {Boolean} | ||
*/ | ||
this.dead = false; | ||
/** | ||
* Number of sequential deaths when worker life time was less than `exitThreshold` option value. | ||
* @type {Number} | ||
* @private | ||
*/ | ||
this._sequentialDeaths = 0; | ||
/** | ||
* Time of the last WorkerWrapper#state('running') event. | ||
* @type {Number} | ||
*/ | ||
this.startTime = null; | ||
/** | ||
* Indicates whether ready() method was called in worker | ||
* @public | ||
* @type {Boolean} | ||
*/ | ||
this.ready = false; | ||
/** | ||
* @type {WorkerPool} | ||
* @private | ||
*/ | ||
this._pool = pool; | ||
/** | ||
* Listen for cluster#fork and worker events. | ||
* @see WorkerWrapper#_proxyEvents to know about repeating worker events on WorkerWrapper instance. | ||
*/ | ||
cluster.on('fork', this._onFork.bind(this)); | ||
this.on('online', this._onOnline.bind(this)); | ||
this.on('disconnect', this._onDisconnect.bind(this)); | ||
this.on('exit', this._onExit.bind(this)); | ||
WorkerWrapper._RPC_EVENTS.forEach(event => { | ||
pool.master.on('received worker ' + event, this.eventTranslator.bind(this, event)); | ||
}); | ||
this.on('ready', this._onReady.bind(this)); | ||
} | ||
/** | ||
@@ -116,6 +186,5 @@ * @property {Number} wid Persistent WorkerWrapper identifier | ||
*/ | ||
Object.defineProperty(this, 'wid', { | ||
value: ++nextId, | ||
enumerable: true | ||
}); | ||
get wid() { | ||
return this._wid; | ||
} | ||
@@ -128,8 +197,5 @@ /** | ||
*/ | ||
Object.defineProperty(this, 'pid', { | ||
get: function() { | ||
return this.process.pid; | ||
}, | ||
enumerable: true | ||
}); | ||
get pid() { | ||
return this.process.pid; | ||
} | ||
@@ -144,10 +210,17 @@ /** | ||
*/ | ||
Object.defineProperty(this, 'state', { | ||
get: function() { | ||
return _state; | ||
}, | ||
enumerable: true | ||
}); | ||
get state() { | ||
return this._state; | ||
} | ||
/** | ||
* Pool this WorkerWrapper belongs to | ||
* @memberOf {WorkerWrapper} | ||
* @public | ||
* @readonly | ||
*/ | ||
get pool() { | ||
return this._pool; | ||
} | ||
/** | ||
* @see WorkerWrapper.STATES for possible `state` argument values | ||
@@ -158,4 +231,4 @@ * @param {WorkerWrapperState} state | ||
*/ | ||
this._setState = function(state) { | ||
_state = state; | ||
_setState(state) { | ||
this._state = state; | ||
@@ -165,251 +238,363 @@ this['_onState' + state[0].toUpperCase() + state.slice(1)](); | ||
this.emit('state', state); | ||
}; | ||
} | ||
eventTranslator(event, worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit(event); | ||
} | ||
} | ||
_onReady() { | ||
this.ready = true; | ||
} | ||
/** | ||
* Indicates worker restarting in progress. | ||
* Changing `restarting` property value outside WorkerWrapper and it inheritors is not recommended. | ||
* @property {Boolean} restarting | ||
* @memberOf {WorkerWrapper} | ||
* @public | ||
* @private | ||
*/ | ||
this.restarting = false; | ||
_onExit() { | ||
this.ready = false; | ||
this._setState(WorkerWrapper.STATES.STOPPED); | ||
} | ||
/** | ||
* Indicates worker stopping in progress. | ||
* Changing `stopping` property value outside WorkerWrapper and it inheritors is not recommended. | ||
* @property {Boolean} stopping | ||
* @memberOf {WorkerWrapper} | ||
* @public | ||
* event:_worker#disconnect handler | ||
* @private | ||
*/ | ||
this.stopping = false; | ||
_onDisconnect() { | ||
this.ready = false; | ||
this._setState(WorkerWrapper.STATES.STOPPING); | ||
} | ||
/** | ||
* Worker can be marked as dead on sequential fails of launch attempt. | ||
* Dead worker will not be restarted on event WorkerWrapper#state('stopped'). | ||
* Internally in the WorkerWrapper worker can be marked as dead, but never go alive again. | ||
* To revive the worker something outside of the WorkerWrapper | ||
* must set the `dead` property value to `false`. | ||
* @public | ||
* @type {Boolean} | ||
* @event WorkerWrapper#ready | ||
*/ | ||
this.dead = false; | ||
/** | ||
* Number of sequential deaths when worker life time was less than `exitThreshold` option value. | ||
* @type {Number} | ||
* event:_worker#online handler | ||
* @fires WorkerWrapper#ready | ||
* @private | ||
*/ | ||
this._sequentialDeaths = 0; | ||
_onOnline() { | ||
this._setState(WorkerWrapper.STATES.RUNNING); | ||
// pass some of the {WorkerWrapper} properties to {Worker} | ||
this.remoteCall(RPC.fns.worker.applyForeignProperties, { | ||
pid: this.process.pid | ||
}); | ||
} | ||
/** | ||
* Time of the last WorkerWrapper#state('running') event. | ||
* @type {Number} | ||
* event:cluster#fork handler | ||
* @private | ||
*/ | ||
this.startTime = null; | ||
_onFork(worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit('fork'); | ||
this._setState(WorkerWrapper.STATES.LAUNCHING); | ||
} | ||
} | ||
/** | ||
* Indicates whether ready() method was called in worker | ||
* @public | ||
* @type {Boolean} | ||
* event:state('launching') handler | ||
* @private | ||
*/ | ||
this.ready = false; | ||
_onStateLaunching() { | ||
this.restarting = false; | ||
if (this.options.forkTimeout) { | ||
this.launchTimeout = setTimeout(() => { | ||
this.launchTimeout = null; | ||
if (this._worker !== null) { | ||
this._worker.kill(); | ||
} | ||
}, this.options.forkTimeout); | ||
} | ||
} | ||
/** | ||
* @type {Master} | ||
* event:state('running') handler | ||
* @private | ||
*/ | ||
this._master = master; | ||
_onStateRunning() { | ||
if (this.launchTimeout) { | ||
clearTimeout(this.launchTimeout); | ||
this.launchTimeout = null; | ||
} | ||
this.startTime = Date.now(); | ||
} | ||
/** | ||
* Listen for cluster#fork and worker events. | ||
* @see WorkerWrapper#_proxyEvents to know about repeating worker events on WorkerWrapper instance. | ||
* event:state('stopping') handler | ||
* @private | ||
*/ | ||
cluster.on('fork', this._onFork.bind(this)); | ||
this.on('online', this._onOnline.bind(this)); | ||
this.on('disconnect', this._onDisconnect.bind(this)); | ||
this.on('exit', this._onExit.bind(this)); | ||
// register global remote command in the context of master to receive events from master | ||
if ( ! master.hasRegisteredRemoteCommand(RPC.fns.master.broadcastWorkerEvent)) { | ||
master.registerRemoteCommand( | ||
RPC.fns.master.broadcastWorkerEvent, | ||
WorkerWrapper.broadcastWorkerEvent.bind(master) | ||
); | ||
_onStateStopping() { | ||
this._scheduleForceStop(); | ||
} | ||
WorkerWrapper._RPC_EVENTS.forEach(function(event) { | ||
master.on('received worker ' + event, WorkerWrapper.createEventTranslator(event).bind(this)); | ||
}, this); | ||
this.on('ready', this._onReady.bind(this)); | ||
}); | ||
/** | ||
* event:state('stopped') handler | ||
* @private | ||
*/ | ||
_onStateStopped() { | ||
this._cancelForceStop(); | ||
WorkerWrapper.createEventTranslator = function(event) { | ||
return /** @this {WorkerWrapper} */function(worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit(event); | ||
// increase sequential deaths count if worker life time less | ||
// than `exitThreshold` option value (and option was passed to constructor). | ||
if (this.options.exitThreshold && | ||
Date.now() - this.startTime < this.options.exitThreshold && | ||
!this.restarting) { | ||
this._sequentialDeaths++; | ||
} | ||
}; | ||
}; | ||
WorkerWrapper.prototype._onReady = function() { | ||
this.ready = true; | ||
}; | ||
// mark worker as dead if too much sequential deaths | ||
if (this._sequentialDeaths > this.options.allowedSequentialDeaths) { | ||
this.dead = true; | ||
} | ||
/** | ||
* Broadcast an event received by IPC from worker as 'worker <event>'. | ||
* @this {Master} | ||
* @param {WorkerWrapper} worker | ||
* @param {String} event | ||
*/ | ||
WorkerWrapper.broadcastWorkerEvent = function(worker, event) { | ||
var args = ['received worker ' + event, worker], | ||
i = 2, | ||
len = arguments.length; | ||
this._worker = null; | ||
for (; i < len; i++) { | ||
args.push(arguments[i]); | ||
// start worker again if it persistent or in the restarting state | ||
// and isn't marked as dead | ||
if (((this.options.persistent && !this.stopping) || this.restarting) && !this.dead) { | ||
setImmediate(this.run.bind(this)); | ||
} | ||
this.stopping = false; | ||
} | ||
this.emit.apply(this, args); | ||
}; | ||
/** | ||
* @returns {Boolean} | ||
*/ | ||
isRunning() { | ||
return this.state === WorkerWrapper.STATES.LAUNCHING || | ||
this.state === WorkerWrapper.STATES.RUNNING; | ||
} | ||
/** | ||
* Possible WorkerWrapper instance states. | ||
* @property {Object} STATES | ||
* @memberOf WorkerWrapper | ||
* @typedef WorkerWrapperState | ||
* @enum | ||
* @readonly | ||
* @public | ||
* @static | ||
*/ | ||
Object.defineProperty(WorkerWrapper, 'STATES', { | ||
value: Object.freeze({ | ||
STOPPED: 'stopped', | ||
LAUNCHING: 'launching', | ||
RUNNING: 'running', | ||
STOPPING: 'stopping' | ||
}), | ||
enumerable: true | ||
}); | ||
/** | ||
* @event WorkerWrapper#error | ||
* @param {LusterError} error | ||
*/ | ||
/** | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onExit = function() { | ||
this.ready = false; | ||
this._setState(WorkerWrapper.STATES.STOPPED); | ||
}; | ||
/** | ||
* Spawn a worker | ||
* @fires WorkerWrapper#error if worker already running | ||
* @fires WorkerWrapper#state('launching') on success | ||
* @returns {WorkerWrapper} self | ||
*/ | ||
run() { | ||
if (this.isRunning()) { | ||
this.emit('error', | ||
LusterWorkerWrapperError.createError( | ||
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE, | ||
{ | ||
wid: this.wid, | ||
pid: this.process.pid, | ||
state: this.state, | ||
targetState: WorkerWrapper.STATES.LAUNCHING | ||
})); | ||
/** | ||
* event:_worker#disconnect handler | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onDisconnect = function() { | ||
this.ready = false; | ||
this._setState(WorkerWrapper.STATES.STOPPING); | ||
}; | ||
return this; | ||
} | ||
/** | ||
* @event WorkerWrapper#ready | ||
*/ | ||
setImmediate(() => { | ||
/** @private */ | ||
this._worker = cluster.fork(Object.assign({ | ||
port: this.options.port, | ||
LUSTER_WID: this.wid, | ||
}, this.options.workerEnv)); | ||
/** | ||
* event:_worker#online handler | ||
* @fires WorkerWrapper#ready | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onOnline = function() { | ||
this._setState(WorkerWrapper.STATES.RUNNING); | ||
/** @private */ | ||
this._remoteCall = RPC.createCaller(this._worker); | ||
// pass some of the {WorkerWrapper} properties to {Worker} | ||
this.remoteCall(RPC.fns.worker.applyForeignProperties, { | ||
pid: this.process.pid | ||
}); | ||
}; | ||
this._proxyEvents(); | ||
}); | ||
/** | ||
* event:cluster#fork handler | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onFork = function(worker) { | ||
if (this._worker && worker.id === this._worker.id) { | ||
this.emit('fork'); | ||
this._setState(WorkerWrapper.STATES.LAUNCHING); | ||
return this; | ||
} | ||
}; | ||
/** | ||
* event:state('launching') handler | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onStateLaunching = function() { | ||
var self = this; | ||
/** | ||
* Disconnect worker to stop it. | ||
* @fires WorkerWrapper#error if worker status is 'stopped' or 'stopping' | ||
* @fires WorkerWrapper#status('stopping') on success | ||
* @returns {WorkerWrapper} | ||
*/ | ||
stop() { | ||
if (!this.isRunning()) { | ||
this.emit('error', | ||
LusterWorkerWrapperError.createError( | ||
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE, | ||
{ | ||
wid: this.wid, | ||
pid: this.process.pid, | ||
state: this.state, | ||
targetState: WorkerWrapper.STATES.STOPPING | ||
})); | ||
this.restarting = false; | ||
return this; | ||
} | ||
if (this.options.forkTimeout) { | ||
this.launchTimeout = setTimeout(function() { | ||
self.launchTimeout = null; | ||
this.stopping = true; | ||
if (self._worker !== null) { | ||
self._worker.kill(); | ||
setImmediate(() => { | ||
// state can be changed before function call | ||
if (this.isRunning()) { | ||
this._worker.disconnect(); | ||
this._scheduleForceStop(); | ||
} | ||
}, this.options.forkTimeout); | ||
}); | ||
return this; | ||
} | ||
}; | ||
/** | ||
* event:state('running') handler | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onStateRunning = function() { | ||
if (this.launchTimeout) { | ||
clearTimeout(this.launchTimeout); | ||
this.launchTimeout = null; | ||
/** | ||
* Set WorkerWrapper#restarting to `true` and stop it, | ||
* which leads to worker restart. | ||
*/ | ||
restart() { | ||
this.restarting = true; | ||
this.stop(); | ||
} | ||
this.startTime = Date.now(); | ||
}; | ||
/** | ||
* Call Worker method via RPC | ||
* @method | ||
* @param {String} name of called command in the worker | ||
* @param {...*} args | ||
*/ | ||
remoteCall(name, ...args) { | ||
if (this.isRunning()) { | ||
this._remoteCall(name, ...args); | ||
} else { | ||
this.emit('error', | ||
LusterWorkerWrapperError.createError( | ||
LusterWorkerWrapperError.CODES.REMOTE_COMMAND_CALL_TO_STOPPED_WORKER, | ||
{ | ||
wid: this.wid, | ||
pid: this.process.pid, | ||
command: name | ||
})); | ||
} | ||
} | ||
/** | ||
* event:state('stopping') handler | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onStateStopping = function() { | ||
this._scheduleForceStop(); | ||
}; | ||
// proxy some properties to WorkerWrapper#_worker | ||
/** | ||
* event:state('stopped') handler | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._onStateStopped = function() { | ||
this._cancelForceStop(); | ||
get id() { | ||
return this._worker.id; | ||
} | ||
// increase sequential deaths count if worker life time less | ||
// than `exitThreshold` option value (and option was passed to constructor). | ||
if (this.options.exitThreshold && | ||
Date.now() - this.startTime < this.options.exitThreshold && | ||
! this.restarting) { | ||
this._sequentialDeaths++; | ||
get process() { | ||
return this._worker.process; | ||
} | ||
// mark worker as dead if too much sequential deaths | ||
if (this._sequentialDeaths > this.options.allowedSequentialDeaths) { | ||
this.dead = true; | ||
get suicide() { | ||
return this._worker.suicide; | ||
} | ||
this._worker = null; | ||
// proxy some methods to WorkerWrapper#_worker | ||
send(...args) { | ||
this._worker.send(...args); | ||
} | ||
// start worker again if it persistent or in the restarting state | ||
// and isn't marked as dead | ||
if (((this.options.persistent && ! this.stopping) || this.restarting) && ! this.dead) { | ||
setImmediate(this.run.bind(this)); | ||
disconnect() { | ||
this._worker.disconnect(); | ||
} | ||
this.stopping = false; | ||
}; | ||
/** | ||
* repeat events from WorkerWrapper#_worker on WorkerWrapper | ||
* @private | ||
*/ | ||
_proxyEvents() { | ||
WorkerWrapper._PROXY_EVENTS | ||
.forEach(eventName => { | ||
this._worker.on(eventName, this.emit.bind(this, eventName)); | ||
}); | ||
} | ||
inspect() { | ||
return 'WW{ id:' + this.wid + ', state: ' + this.state + '}'; | ||
} | ||
broadcastEvent(...args) { | ||
//TODO args here passed as single array, but remoteCall can handle multiple args | ||
this.remoteCall(RPC.fns.worker.broadcastMasterEvent, args); | ||
} | ||
/** | ||
* Do a remote call to worker, wait for worker to handle it, then execute registered callback | ||
* @method | ||
* @param {String} opts.command | ||
* @param {Function} opts.callback | ||
* @param {Number} [opts.timeout] in milliseconds | ||
* @param {*} [opts.data] | ||
* @public | ||
*/ | ||
remoteCallWithCallback(opts) { | ||
const callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout); | ||
this.remoteCall(opts.command, opts.data, callbackId); | ||
} | ||
/** | ||
* Schedule a forceful worker stop using signal. | ||
* Only schedules timeout if it was not set yet. | ||
* @private | ||
*/ | ||
_scheduleForceStop() { | ||
// We could schedule force stop either when `stop` method is called or when `disconnected` event received from | ||
// worker. In most cases `stop` will be called and then `disconnected` event will fire, therefore we shall | ||
// make sure we do not re-run the force stop timer. | ||
if (this.options.stopTimeout && !this.stopTimeout) { | ||
this.stopTimeout = setTimeout(() => { | ||
this.stopTimeout = null; | ||
if (this._worker !== null) { | ||
this._worker.process.kill(); | ||
} | ||
}, this.options.stopTimeout); | ||
} | ||
} | ||
/** | ||
* Clears a forceful worker stop. | ||
* @private | ||
*/ | ||
_cancelForceStop() { | ||
if (this.stopTimeout) { | ||
clearTimeout(this.stopTimeout); | ||
this.stopTimeout = null; | ||
} | ||
} | ||
/** | ||
* Adds this worker to pool's restart queue | ||
* @public | ||
*/ | ||
softRestart() { | ||
this._pool.scheduleWorkerRestart(this); | ||
} | ||
} | ||
/** | ||
* Possible WorkerWrapper instance states. | ||
* @property {Object} STATES | ||
* @memberOf WorkerWrapper | ||
* @typedef WorkerWrapperState | ||
* @enum | ||
* @readonly | ||
* @public | ||
* @static | ||
*/ | ||
Object.defineProperty(WorkerWrapper, 'STATES', { | ||
value: Object.freeze({ | ||
STOPPED: 'stopped', | ||
LAUNCHING: 'launching', | ||
RUNNING: 'running', | ||
STOPPING: 'stopping' | ||
}), | ||
enumerable: true | ||
}); | ||
/** | ||
* Events to repeat from WorkerWrapper#_worker on WorkerWrapper instance | ||
@@ -422,3 +607,9 @@ * @memberOf WorkerWrapper | ||
Object.defineProperty(WorkerWrapper, '_PROXY_EVENTS', { | ||
value: Object.freeze(['message', 'online', 'listening', 'disconnect', 'exit']), | ||
value: Object.freeze([ | ||
'message', | ||
'online', | ||
'listening', | ||
'disconnect', | ||
'exit' | ||
]), | ||
enumerable: true | ||
@@ -460,211 +651,2 @@ }); | ||
/** | ||
* @returns {Boolean} | ||
*/ | ||
WorkerWrapper.prototype.isRunning = function() { | ||
return this.state === WorkerWrapper.STATES.LAUNCHING || | ||
this.state === WorkerWrapper.STATES.RUNNING; | ||
}; | ||
/** | ||
* @event WorkerWrapper#error | ||
* @param {LusterError} error | ||
*/ | ||
/** | ||
* Spawn a worker | ||
* @fires WorkerWrapper#error if worker already running | ||
* @fires WorkerWrapper#state('launching') on success | ||
* @returns {WorkerWrapper} self | ||
*/ | ||
WorkerWrapper.prototype.run = function() { | ||
if (this.isRunning()) { | ||
this.emit('error', | ||
LusterWorkerWrapperError.createError( | ||
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE, | ||
{ | ||
wid: this.wid, | ||
pid: this.process.pid, | ||
state: this.state, | ||
targetState: WorkerWrapper.STATES.LAUNCHING | ||
})); | ||
return this; | ||
} | ||
var self = this; | ||
setImmediate(function() { | ||
/** @private */ | ||
self._worker = cluster.fork({ | ||
port: self.options.port, | ||
LUSTER_WID: self.wid, | ||
}); | ||
/** @private */ | ||
self._remoteCall = RPC.createCaller(self._worker); | ||
self._proxyEvents(); | ||
}); | ||
return this; | ||
}; | ||
/** | ||
* Disconnect worker to stop it. | ||
* @fires WorkerWrapper#error if worker status is 'stopped' or 'stopping' | ||
* @fires WorkerWrapper#status('stopping') on success | ||
* @returns {WorkerWrapper} | ||
*/ | ||
WorkerWrapper.prototype.stop = function() { | ||
if ( ! this.isRunning()) { | ||
this.emit('error', | ||
LusterWorkerWrapperError.createError( | ||
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE, | ||
{ | ||
wid: this.wid, | ||
pid: this.process.pid, | ||
state: this.state, | ||
targetState: WorkerWrapper.STATES.STOPPING | ||
})); | ||
return this; | ||
} | ||
this.stopping = true; | ||
var self = this; | ||
setImmediate(function() { | ||
// state can be changed before function call | ||
if (self.isRunning()) { | ||
self._worker.disconnect(); | ||
self._scheduleForceStop(); | ||
} | ||
}); | ||
return this; | ||
}; | ||
/** | ||
* Set WorkerWrapper#restarting to `true` and stop it, | ||
* which leads to worker restart. | ||
*/ | ||
WorkerWrapper.prototype.restart = function() { | ||
this.restarting = true; | ||
this.stop(); | ||
}; | ||
/** | ||
* Call Worker method via RPC | ||
* @method | ||
* @param {String} name of called command in the worker | ||
* @param {*} ...args | ||
*/ | ||
WorkerWrapper.prototype.remoteCall = function(name) { | ||
if (this.isRunning()) { | ||
this._remoteCall.apply(this, arguments); | ||
} else { | ||
this.emit('error', | ||
LusterWorkerWrapperError.createError( | ||
LusterWorkerWrapperError.CODES.REMOTE_COMMAND_CALL_TO_STOPPED_WORKER, | ||
{ | ||
wid: this.wid, | ||
pid: this.process.pid, | ||
command: name | ||
})); | ||
} | ||
}; | ||
// proxy some properties to WorkerWrapper#_worker | ||
['id', 'process', 'suicide'].forEach(function(propName) { | ||
Object.defineProperty(WorkerWrapper.prototype, propName, { | ||
get: function() { | ||
return this._worker[propName]; | ||
}, | ||
enumerable: true | ||
}); | ||
}); | ||
// proxy some methods to WorkerWrapper#_worker | ||
['send', 'disconnect'].forEach(function(methodName) { | ||
WorkerWrapper.prototype[methodName] = function() { | ||
this._worker[methodName].apply(this._worker, arguments); | ||
}; | ||
}); | ||
/** | ||
* repeat events from WorkerWrapper#_worker on WorkerWrapper | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._proxyEvents = function() { | ||
WorkerWrapper._PROXY_EVENTS | ||
.forEach(function(eventName) { | ||
this._worker.on(eventName, this.emit.bind(this, eventName)); | ||
}, this); | ||
}; | ||
WorkerWrapper.prototype.inspect = function() { | ||
return 'WW{ id:' + this.wid + ', state: ' + this.state + '}'; | ||
}; | ||
WorkerWrapper.prototype.broadcastEvent = function() { | ||
this.remoteCall(RPC.fns.worker.broadcastMasterEvent, Array.prototype.slice.call(arguments, 0)); | ||
}; | ||
/** | ||
* Do a remote call to worker, wait for worker to handle it, then execute registered callback | ||
* @method | ||
* @param {String} opts.command | ||
* @param {Function} opts.callback | ||
* @param {Number} [opts.timeout] in milliseconds | ||
* @param {*} [opts.data] | ||
* @public | ||
*/ | ||
WorkerWrapper.prototype.remoteCallWithCallback = function(opts) { | ||
var callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout); | ||
this.remoteCall(opts.command, opts.data, callbackId); | ||
}; | ||
/** | ||
* Schedule a forceful worker stop using signal. | ||
* Only schedules timeout if it was not set yet. | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._scheduleForceStop = function() { | ||
// We could schedule force stop either when `stop` method is called or when `disconnected` event received from | ||
// worker. In most cases `stop` will be called and then `disconnected` event will fire, therefore we shall | ||
// make sure we do not re-run the force stop timer. | ||
if (this.options.stopTimeout && ! this.stopTimeout) { | ||
var self = this; | ||
this.stopTimeout = setTimeout(function() { | ||
self.stopTimeout = null; | ||
if (self._worker !== null) { | ||
self._worker.process.kill(); | ||
} | ||
}, this.options.stopTimeout); | ||
} | ||
}; | ||
/** | ||
* Clears a forceful worker stop. | ||
* @private | ||
*/ | ||
WorkerWrapper.prototype._cancelForceStop = function() { | ||
if (this.stopTimeout) { | ||
clearTimeout(this.stopTimeout); | ||
this.stopTimeout = null; | ||
} | ||
}; | ||
/** | ||
* Adds this worker to master's restart queue | ||
* @public | ||
*/ | ||
WorkerWrapper.prototype.softRestart = function() { | ||
this._master.scheduleWorkerRestart(this); | ||
}; | ||
module.exports = WorkerWrapper; |
@@ -1,8 +0,11 @@ | ||
var cluster = require('cluster'), | ||
const cluster = require('cluster'), | ||
RPC = require('./rpc'), | ||
RPCCallback = require('./rpc-callback'), | ||
ClusterProcess = require('./cluster_process'), | ||
LusterWorkerError = require('./errors').LusterWorkerError, | ||
Worker; | ||
LusterWorkerError = require('./errors').LusterWorkerError; | ||
const pEvent = require('p-event'); | ||
const wid = parseInt(process.env.LUSTER_WID, 10); | ||
/** | ||
@@ -13,5 +16,25 @@ * @constructor | ||
*/ | ||
Worker = ClusterProcess.create(function Worker() { | ||
Worker.__super.apply(this, arguments); | ||
class Worker extends ClusterProcess { | ||
constructor() { | ||
super(); | ||
this.eexKey = wid; | ||
const broadcastEvent = this._broadcastEvent; | ||
this._foreignPropertiesReceivedPromise = pEvent(this, 'foreign properties received'); | ||
this.on('configured', broadcastEvent.bind(this, 'configured')); | ||
this.on('extension loaded', broadcastEvent.bind(this, 'extension loaded')); | ||
this.on('initialized', broadcastEvent.bind(this, 'initialized')); | ||
this.on('loaded', broadcastEvent.bind(this, 'loaded')); | ||
this.on('ready', broadcastEvent.bind(this, 'ready')); | ||
cluster.worker.on('disconnect', this.emit.bind(this, 'disconnect')); | ||
this._ready = false; | ||
this.registerRemoteCommand(RPC.fns.worker.applyForeignProperties, this.applyForeignProperties.bind(this)); | ||
this.registerRemoteCommand(RPC.fns.worker.broadcastMasterEvent, this.broadcastMasterEvent.bind(this)); | ||
} | ||
/** | ||
@@ -23,6 +46,5 @@ * @memberOf {Worker} | ||
*/ | ||
Object.defineProperty(this, 'wid', { | ||
value: parseInt(process.env.LUSTER_WID, 10), | ||
enumerable: true | ||
}); | ||
get wid(){ | ||
return wid; | ||
} | ||
@@ -36,113 +58,119 @@ /** | ||
*/ | ||
Object.defineProperty(this, 'id', { | ||
value: cluster.worker.id, | ||
enumerable: true | ||
}); | ||
get id() { | ||
return cluster.worker.id; | ||
} | ||
var broadcastEvent = this._broadcastEvent; | ||
/** | ||
* Emit an event received from the master as 'master <event>'. | ||
*/ | ||
broadcastMasterEvent(proc, emitArgs) { | ||
const [eventName, ...eventArgs] = emitArgs; | ||
this.emit('master ' + eventName, ...eventArgs); | ||
} | ||
this._foreignPropertiesReceived = false; | ||
this.on('foreign properties received', function() { this._foreignPropertiesReceived = true; }.bind(this)); | ||
/** | ||
* Transmit worker event to master, which plays as relay, | ||
* retransmitting it as 'worker <event>' to all master-side listeners. | ||
* @param {String} event Event name | ||
* @param {...*} args | ||
* @private | ||
*/ | ||
_broadcastEvent(event, ...args) { | ||
this.remoteCall(RPC.fns.master.broadcastWorkerEvent, event, ...args); | ||
} | ||
this.on('configured', broadcastEvent.bind(this, 'configured')); | ||
this.on('extension loaded', broadcastEvent.bind(this, 'extension loaded')); | ||
this.on('initialized', broadcastEvent.bind(this, 'initialized')); | ||
this.on('loaded', broadcastEvent.bind(this, 'loaded')); | ||
this.on('ready', broadcastEvent.bind(this, 'ready')); | ||
cluster.worker.on('disconnect', this.emit.bind(this, 'disconnect')); | ||
/** | ||
* Extend {Worker} properties with passed by {Master}. | ||
* @param {ClusterProcess} proc | ||
* @param {*} props | ||
*/ | ||
applyForeignProperties(proc, props) { | ||
for (const propName of Object.keys(props)) { | ||
Object.defineProperty(this, propName, { | ||
value: props[propName], | ||
enumerable: true | ||
}); | ||
} | ||
this.emit('foreign properties received'); | ||
} | ||
this._ready = false; | ||
whenForeignPropertiesReceived() { | ||
return this._foreignPropertiesReceivedPromise; | ||
} | ||
this.registerRemoteCommand(RPC.fns.worker.applyForeignProperties, this.applyForeignProperties.bind(this)); | ||
this.registerRemoteCommand(RPC.fns.worker.broadcastMasterEvent, this.broadcastMasterEvent.bind(this)); | ||
}); | ||
/** | ||
* @override | ||
* @see ClusterProcess | ||
* @private | ||
*/ | ||
_setupIPCMessagesHandler() { | ||
process.on('message', this._onMessage.bind(this, this)); | ||
} | ||
/** | ||
* Emit an event received from the master as 'master <event>'. | ||
*/ | ||
Worker.prototype.broadcastMasterEvent = function(proc, emitArgs) { | ||
var args = ['master ' + emitArgs[0]].concat(emitArgs.slice(1)); | ||
this.emit.apply(this, args); | ||
}; | ||
/** | ||
* Turns worker to `ready` state. Must be called by worker | ||
* if option `control.triggerReadyStateManually` set `true`. | ||
* @returns {Worker} self | ||
* @public | ||
*/ | ||
ready() { | ||
if (this._ready) { | ||
throw new LusterWorkerError(LusterWorkerError.CODES.ALREADY_READY); | ||
} | ||
/** | ||
* Transmit worker event to master, which plays as relay, | ||
* retransmitting it as 'worker <event>' to all master-side listeners. | ||
* @param {String} event Event name | ||
* @private | ||
*/ | ||
Worker.prototype._broadcastEvent = function(event) { // eslint-disable-line no-unused-vars | ||
var args = [RPC.fns.master.broadcastWorkerEvent], | ||
i = 0, | ||
len = arguments.length; | ||
this._ready = true; | ||
this.emit('ready'); | ||
for (; i < len; i++) { | ||
args.push(arguments[i]); | ||
return this; | ||
} | ||
this.remoteCall.apply(this, args); | ||
}; | ||
/** | ||
* Do a remote call to master, wait for master to handle it, then execute registered callback | ||
* @method | ||
* @param {String} opts.command | ||
* @param {Function} opts.callback | ||
* @param {Number} [opts.timeout] in milliseconds | ||
* @param {*} [opts.data] | ||
* @public | ||
*/ | ||
remoteCallWithCallback(opts) { | ||
const callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout); | ||
/** | ||
* Extend {Worker} properties with passed by {Master}. | ||
* @param {ClusterProcess} proc | ||
* @param {*} props | ||
*/ | ||
Worker.prototype.applyForeignProperties = function(proc, props) { | ||
Object.keys(props) | ||
.forEach(function(propName) { | ||
Object.defineProperty(this, propName, { | ||
value: props[propName], | ||
enumerable: true | ||
}); | ||
}, this); | ||
this.emit('foreign properties received'); | ||
}; | ||
this.remoteCall(opts.command, opts.data, callbackId); | ||
} | ||
/** | ||
* @param {Function} fn | ||
*/ | ||
Worker.whenForeignPropertiesReceived = function(fn) { | ||
return /** @this {Worker} */function() { | ||
if (this._foreignPropertiesReceived) { | ||
setImmediate(fn.bind(this)); | ||
} else { | ||
this.once('foreign properties received', fn.bind(this)); | ||
} | ||
async _run() { | ||
await this.whenInitialized(); | ||
await this.whenForeignPropertiesReceived(); | ||
return this; | ||
}; | ||
}; | ||
const workerBase = this.config.resolve('app'); | ||
/** | ||
* `Require` application main script. | ||
* Execution will be delayed until Worker became configured | ||
* (`configured` event fired). | ||
* @returns {Worker} self | ||
* @public | ||
*/ | ||
Worker.prototype.run = Worker.whenInitialized( | ||
Worker.whenForeignPropertiesReceived(function() { | ||
var workerBase = this.config.resolve('app'); | ||
require(workerBase); | ||
try { | ||
require(workerBase); | ||
} catch (e) { | ||
console.error(`Worker failed on require ${workerBase}`); | ||
console.error(e); | ||
process.exit(1); | ||
} | ||
this.emit('loaded', workerBase); | ||
if ( ! this.config.get('control.triggerReadyStateManually', false)) { | ||
if (!this.config.get('control.triggerReadyStateManually', false)) { | ||
setImmediate(this.ready.bind(this)); | ||
} | ||
} | ||
/** | ||
* `Require` application main script. | ||
* Execution will be delayed until Worker became configured | ||
* (`configured` event fired). | ||
* @returns {Worker} self | ||
* @public | ||
*/ | ||
run() { | ||
this._run(); | ||
return this; | ||
})); | ||
} | ||
} | ||
/** | ||
* @override | ||
* @see ClusterProcess | ||
* @private | ||
*/ | ||
Worker.prototype._setupIPCMessagesHandler = function() { | ||
process.on('message', this._onMessage.bind(this, this)); | ||
}; | ||
/** | ||
* Call Master method via RPC | ||
@@ -155,34 +183,2 @@ * @method | ||
/** | ||
* Turns worker to `ready` state. Must be called by worker | ||
* if option `control.triggerReadyStateManually` set `true`. | ||
* @returns {Worker} self | ||
* @public | ||
*/ | ||
Worker.prototype.ready = function() { | ||
if (this._ready) { | ||
throw new LusterWorkerError(LusterWorkerError.CODES.ALREADY_READY); | ||
} | ||
this._ready = true; | ||
this.emit('ready'); | ||
return this; | ||
}; | ||
/** | ||
* Do a remote call to master, wait for master to handle it, then execute registered callback | ||
* @method | ||
* @param {String} opts.command | ||
* @param {Function} opts.callback | ||
* @param {Number} [opts.timeout] in milliseconds | ||
* @param {*} [opts.data] | ||
* @public | ||
*/ | ||
Worker.prototype.remoteCallWithCallback = function(opts) { | ||
var callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout); | ||
this.remoteCall(opts.command, opts.data, callbackId); | ||
}; | ||
module.exports = Worker; |
{ | ||
"name": "luster", | ||
"version": "2.0.1", | ||
"version": "3.0.0-alpha.0", | ||
"description": "Node.js cluster wrapper", | ||
@@ -10,3 +10,3 @@ "main": "./lib/luster.js", | ||
"scripts": { | ||
"lint": "eslint ./lib ./test ./examples", | ||
"lint": "eslint ./lib ./test ./examples ./bin", | ||
"unit": "istanbul test _mocha -- test/unit/test", | ||
@@ -40,4 +40,4 @@ "func": "mocha test/func/test $@", | ||
"dependencies": { | ||
"extend": "^3.0.0", | ||
"objex": "^0.4.1", | ||
"delay": "^3.0.0", | ||
"p-event": "^2.1.0", | ||
"terror": "^1.0.0" | ||
@@ -47,3 +47,4 @@ }, | ||
"chai": "^3.5.0", | ||
"eslint": "^3.15.0", | ||
"chai-as-promised": "^7.1.1", | ||
"eslint": "^4.19.1", | ||
"eslint-config-nodules": "^0.4.0", | ||
@@ -56,4 +57,4 @@ "istanbul": "^0.4.1", | ||
"engines": { | ||
"node": ">=4" | ||
"node": ">=8" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
95211
20
2498
8
2
1
+ Addeddelay@^3.0.0
+ Addedp-event@^2.1.0
+ Addeddelay@3.1.0(transitive)
+ Addedp-event@2.3.1(transitive)
+ Addedp-finally@1.0.0(transitive)
+ Addedp-timeout@2.0.1(transitive)
- Removedextend@^3.0.0
- Removedobjex@^0.4.1
- Removedextend@3.0.2(transitive)
- Removedobjex@0.4.1(transitive)