Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

luster

Package Overview
Dependencies
Maintainers
4
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

luster - npm Package Compare versions

Comparing version 2.0.1 to 3.0.0-alpha.0

lib/worker-pool.js

2

bin/luster.js
#!/usr/bin/env node
var /** @type {ClusterProcess} */
const /** @type {ClusterProcess} */
luster = require('../lib/luster'),

@@ -4,0 +4,0 @@ path = require('path'),

@@ -1,2 +0,2 @@

var cluster = require('cluster'),
const cluster = require('cluster'),
path = require('path'),

@@ -8,5 +8,6 @@ RPC = require('./rpc'),

LusterClusterProcessError = require('./errors').LusterClusterProcessError,
LusterConfigurationError = require('./errors').LusterConfigurationError,
ClusterProcess;
LusterConfigurationError = require('./errors').LusterConfigurationError;
const pEvent = require('p-event');
/**

@@ -22,2 +23,23 @@ * @param {Object} context

/**
* Add `basedir`, `node_modules` contained in the `basedir` and its ancestors to `module.paths`
* @param {String} basedir
*/
function extendResolvePath(basedir) {
// using module internals isn't good, but restarting with corrected NODE_PATH looks more ugly, IMO
module.paths.push(basedir);
const _basedir = basedir.split('/'),
size = basedir.length;
let i = 0;
while (size > i++) {
const modulesPath = _basedir.slice(0, i).join('/') + '/node_modules';
if (module.paths.indexOf(modulesPath) === -1) {
module.paths.push(modulesPath);
}
}
}
/**
* @typedef Extension

@@ -32,311 +54,275 @@ * @property {Function} [configure] (Object config, ClusterProcess proc)

*/
ClusterProcess = EventEmitterEx.create(function ClusterProcess() {
ClusterProcess.__super.apply(this, arguments);
class ClusterProcess extends EventEmitterEx {
constructor() {
super();
/** @private */
this._remoteCommands = {};
/** @private */
this._initialized = false;
/** @private */
this.extensions = {};
/** @private */
this._remoteCommands = {};
/** @private */
this.extensions = {};
/**
* @type Promise<void>
* @private
* */
this._initPromise = pEvent(this, 'initialized');
/**
* @type {Configuration}
* @public
*/
this.config = null;
/**
* @type {Configuration}
* @public
*/
this.config = null;
this.once('configured', this._onConfigured.bind(this));
this.once('configured', this._onConfigured.bind(this));
this._setupIPCMessagesHandler();
this._setupIPCMessagesHandler();
this.registerRemoteCommand(RPC.fns.callback, RPCCallback.processCallback.bind(RPCCallback));
});
this.registerRemoteCommand(RPC.fns.callback, RPCCallback.processCallback.bind(RPCCallback));
}
/**
* @memberOf ClusterProcess
* @property {Boolean} isMaster
* @readonly
* @public
*/
Object.defineProperty(ClusterProcess.prototype, 'isMaster', {
value: cluster.isMaster,
enumerable: true
});
/**
* @memberOf ClusterProcess
* @property {Boolean} isWorker
* @readonly
* @public
*/
Object.defineProperty(ClusterProcess.prototype, 'isWorker', {
value: cluster.isWorker,
enumerable: true
});
/**
* @event ClusterProcess#configured
*/
/**
* @fires ClusterProcess#configured
* @param {Object} config
* @param {Boolean} [applyEnv=true]
* @param {String} [basedir=process.cwd()] for Configuration#resolve relative paths
* @returns {ClusterProcess} this
* @throws {LusterConfigurationError} if configuration check failed (check errors will be logged to STDERR)
* @public
*/
ClusterProcess.prototype.configure = function(config, applyEnv, basedir) {
if (typeof applyEnv === 'undefined' || applyEnv) {
Configuration.applyEnvironment(config);
/**
* @memberOf ClusterProcess
* @property {Boolean} isMaster
* @readonly
* @public
*/
get isMaster() {
return cluster.isMaster;
}
if (typeof(basedir) === 'undefined') {
basedir = process.cwd();
/**
* @memberOf ClusterProcess
* @property {Boolean} isWorker
* @readonly
* @public
*/
get isWorker() {
return cluster.isWorker;
}
if (Configuration.check(config) > 0) {
this.emit('error',
LusterConfigurationError.createError(
LusterConfigurationError.CODES.CONFIGURATION_CHECK_FAILED));
} else {
this.config = Configuration.extend(config, basedir);
/**
* @event ClusterProcess#configured
*/
// hack to tweak underlying EventEmitter max listeners
// if your luster-based app extensively use luster events
this.setMaxListeners(this.config.get('maxEventListeners', 100));
/**
* @fires ClusterProcess#configured
* @param {Object} config
* @param {Boolean} [applyEnv=true]
* @param {String} [basedir=process.cwd()] for Configuration#resolve relative paths
* @returns {ClusterProcess} this
* @throws {LusterConfigurationError} if configuration check failed (check errors will be logged to STDERR)
* @public
*/
configure(config, applyEnv, basedir) {
if (typeof applyEnv === 'undefined' || applyEnv) {
Configuration.applyEnvironment(config);
}
this.emit('configured');
}
if (typeof(basedir) === 'undefined') {
basedir = process.cwd();
}
return this;
};
if (Configuration.check(config) > 0) {
this.emit('error',
LusterConfigurationError.createError(
LusterConfigurationError.CODES.CONFIGURATION_CHECK_FAILED));
} else {
this.config = Configuration.extend(config, basedir);
/**
* @param {String} name
* @param {Function} callback function(error)
*/
ClusterProcess.prototype.loadExtension = function(name, callback) {
var /** @type Extension */
extension = require(name),
self = this,
config = this.config.get('extensions.' + name);
// hack to tweak underlying EventEmitter max listeners
// if your luster-based app extensively use luster events
this.setMaxListeners(this.config.get('maxEventListeners', 100));
this.extensions[name] = extension;
this.emit('configured');
}
// if `config` was an Object then it became instance of Configuration
// else returns original value
config = Configuration.extend(config, this.config.getBaseDir());
if (extension.configure.length > 2) {
setImmediate(function() {
extension.configure(config, self, callback);
});
} else {
setImmediate(function() {
extension.configure(config, self);
callback();
});
return this;
}
};
/**
* Add `basedir`, `node_modules` contained in the `basedir` and its ancestors to `module.paths`
* @param {String} basedir
*/
function extendResolvePath(basedir) {
// using module internals isn't good, but restarting with corrected NODE_PATH looks more ugly, IMO
module.paths.push(basedir);
/**
* @param {String} name
* @param {Function} callback function(error)
*/
loadExtension(name, callback) {
const /** @type Extension */
extension = require(name);
let config = this.config.get('extensions.' + name);
var _basedir = basedir.split('/'),
size = basedir.length,
i = 0,
modulesPath;
this.extensions[name] = extension;
while (size > i++) {
modulesPath = _basedir.slice(0, i).join('/') + '/node_modules';
// if `config` was an Object then it became instance of Configuration
// else returns original value
config = Configuration.extend(config, this.config.getBaseDir());
if (module.paths.indexOf(modulesPath) === -1) {
module.paths.push(modulesPath);
if (extension.configure.length > 2) {
setImmediate(() => extension.configure(config, this, callback));
} else {
setImmediate(() => {
extension.configure(config, this);
callback();
});
}
}
}
/**
* @event ClusterProcess#initialized
*/
/**
* @event ClusterProcess#initialized
*/
/**
* @fires ClusterProcess#initialized
* @private
*/
ClusterProcess.prototype._onConfigured = function() {
cluster.setMaxListeners(this.getMaxListeners());
/**
* @fires ClusterProcess#initialized
* @private
*/
_onConfigured() {
cluster.setMaxListeners(this.getMaxListeners());
// try to use `extensionsPath` option to resolve extensions' modules
// use worker file directory as fallback
extendResolvePath(path.resolve(
this.config.resolve('extensionsPath', path.dirname(this.config.resolve('app')))
));
// try to use `extensionsPath` option to resolve extensions' modules
// use worker file directory as fallback
extendResolvePath(path.resolve(
this.config.resolve('extensionsPath', path.dirname(this.config.resolve('app')))
));
var self = this,
extensions = this.config.getKeys('extensions'),
wait = extensions.length,
loadedExtensions = [],
loadTimeout = this.config.get('extensionsLoadTimeout', 10000),
loadTimer;
const extensions = this.config.getKeys('extensions'),
wait = extensions.length,
loadedExtensions = new Set(),
loadTimeout = this.config.get('extensionsLoadTimeout', 10000);
let loadTimer;
if (wait === 0) {
self._initialized = true;
self.emit('initialized');
return;
}
if (wait === 0) {
this.emit('initialized');
return;
}
extensions.forEach(function(name) {
this.loadExtension(name, function(error) {
if (error) {
return self.emit('error', error);
}
extensions.forEach(name => {
this.loadExtension(name, error => {
if (error) {
return this.emit('error', error);
}
loadedExtensions.push(name);
self.emit('extension loaded', name);
loadedExtensions.add(name);
this.emit('extension loaded', name);
if (loadedExtensions.length === wait) {
clearTimeout(loadTimer);
self._initialized = true;
self.emit('initialized');
}
if (loadedExtensions.size === wait) {
clearTimeout(loadTimer);
this.emit('initialized');
}
});
});
}, this);
loadTimer = setTimeout(function() {
var timeouted = extensions.filter(function (name) {
return loadedExtensions.indexOf(name) < 0;
}),
error = LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.EXTENSIONS_LOAD_TIMEOUT,
{ timeouted: timeouted, timeout: loadTimeout });
loadTimer = setTimeout(() => {
const timeouted = extensions.filter(name => !loadedExtensions.has(name)),
error = LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.EXTENSIONS_LOAD_TIMEOUT,
{timeouted, timeout: loadTimeout});
self.emit('error', error);
}, loadTimeout);
};
this.emit('error', error);
}, loadTimeout);
}
/**
* Wrap `fn` to delay it execution to ClusterProcess done initialization.
* @this {ClusterProcess}
* @param {Function} fn
* @returns {Function}
*/
ClusterProcess.whenInitialized = function(fn) {
return /** @this {ClusterProcess} */function() {
if (this._initialized) {
setImmediate(fn.bind(this));
} else {
this.once('initialized', fn.bind(this));
/**
* Resolves when ClusterProcess done initialization.
* @this {ClusterProcess}
* @returns {Promise<void>}
*/
whenInitialized() {
return this._initPromise;
}
/**
* Register `fn` as allowed for remote call via IPC.
* @param {String} name
* @param {Function} fn
* @throws LusterClusterProcessError if remote procedure with `name` already registered.
* @public
*/
registerRemoteCommand(name, fn) {
if (has(this._remoteCommands, name)) {
throw LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.REMOTE_COMMAND_ALREADY_REGISTERED,
{name});
}
return this;
};
};
this._remoteCommands[name] = fn;
}
/**
* Register `fn` as allowed for remote call via IPC.
* @param {String} name
* @param {Function} fn
* @throws LusterClusterProcessError if remote procedure with `name` already registered.
* @public
*/
ClusterProcess.prototype.registerRemoteCommand = function(name, fn) {
if (has(this._remoteCommands, name)) {
throw LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.REMOTE_COMMAND_ALREADY_REGISTERED,
{ name: name });
/**
* Remove previously registered remote command
* @param {String} name
* @public
*/
unregisterRemoteCommand(name) {
delete this._remoteCommands[name];
}
this._remoteCommands[name] = fn;
};
/**
* Remove previously registered remote command
* @param {String} name
* @public
*/
ClusterProcess.prototype.unregisterRemoteCommand = function(name) {
delete this._remoteCommands[name];
};
/**
* Checks is remote command registered.
* @param {String} name
* @returns {Boolean}
*/
ClusterProcess.prototype.hasRegisteredRemoteCommand = function(name) {
return has(this._remoteCommands, name);
};
/**
* @abstract
* @throws LusterClusterProcessError if method is not overriden in the inheritor of ClusterProcess
* @private
*/
ClusterProcess.prototype._setupIPCMessagesHandler = function() {
throw LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.ABSTRACT_METHOD_IS_NOT_IMPLEMENTED,
{
method: 'ClusterProcess#_setupIPCMessagesHandler',
klass: this.constructor.name
});
};
/**
* Call function registered as remote command if `rawMessage` is valid luster IPC message
* @param {WorkerWrapper|Worker} target object with `remoteCall` method which can be used to respond to message
* @param {*} rawMessage
* @see RPC
* @private
*/
ClusterProcess.prototype._onMessage = function(target, rawMessage) {
var message = RPC.parseMessage(rawMessage);
if (message === null) {
return;
/**
* Checks is remote command registered.
* @param {String} name
* @returns {Boolean}
*/
hasRegisteredRemoteCommand(name) {
return has(this._remoteCommands, name);
}
if ( ! has(this._remoteCommands, message.cmd)) {
/**
* @abstract
* @throws LusterClusterProcessError if method is not overriden in the inheritor of ClusterProcess
* @private
*/
_setupIPCMessagesHandler() {
throw LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.REMOTE_COMMAND_IS_NOT_REGISTERED,
LusterClusterProcessError.CODES.ABSTRACT_METHOD_IS_NOT_IMPLEMENTED,
{
name: message.cmd,
method: 'ClusterProcess#_setupIPCMessagesHandler',
klass: this.constructor.name
});
} else if (typeof message.args === 'undefined') {
this._remoteCommands[message.cmd].call(null, target);
} else {
this._remoteCommands[message.cmd].apply(null, [target].concat(message.args));
}
};
/**
* Register remote command with respect to the presence of callback
* @param {String} command
* @param {Function} handler
*/
ClusterProcess.prototype.registerRemoteCommandWithCallback = function(command, handler) {
/**
* @param {ClusterProcess} proc
* @param {*} [data]
* @param {String} callbackId
* Call function registered as remote command if `rawMessage` is valid luster IPC message
* @param {WorkerWrapper|Worker} target object with `remoteCall` method which can be used to respond to message
* @param {*} rawMessage
* @see RPC
* @private
*/
this.registerRemoteCommand(command, function(proc, data, callbackId) {
_onMessage(target, rawMessage) {
const message = RPC.parseMessage(rawMessage);
if (message === null) {
return;
}
if (!has(this._remoteCommands, message.cmd)) {
throw LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.REMOTE_COMMAND_IS_NOT_REGISTERED,
{
name: message.cmd,
klass: this.constructor.name
});
} else if (typeof message.args === 'undefined') {
this._remoteCommands[message.cmd](target);
} else {
this._remoteCommands[message.cmd](target, ...message.args);
}
}
/**
* Register remote command with respect to the presence of callback
* @param {String} command
* @param {Function} handler
*/
registerRemoteCommandWithCallback(command, handler) {
/**
* @param {*} [callbackData]
* @param {ClusterProcess} proc
* @param {*} [data]
* @param {String} callbackId
*/
return handler(function(callbackData) {
proc.remoteCall(RPC.fns.callback, callbackId, callbackData);
}, data);
});
};
this.registerRemoteCommand(command, (proc, data, callbackId) => {
/**
* @param {*} [callbackData]
*/
return handler(callbackData => {
proc.remoteCall(RPC.fns.callback, callbackId, callbackData);
}, data);
});
}
}
module.exports = ClusterProcess;

@@ -1,5 +0,4 @@

var LusterConfigurationError = require('../errors').LusterConfigurationError,
const LusterConfigurationError = require('../errors').LusterConfigurationError,
typeOf = require('./helpers').typeOf,
get = require('./helpers').get,
CHECKS;
get = require('./helpers').get;

@@ -19,3 +18,3 @@ /**

*/
CHECKS = {
const CHECKS = {
// path to worker main module

@@ -55,4 +54,3 @@ 'app': { required: true, type: 'string' },

function checkProperty(path, value, check) {
var type = typeOf(value),
allowedTypes;
const type = typeOf(value);

@@ -71,3 +69,3 @@ // required property

// allowed types
allowedTypes = check.type && [].concat(check.type);
const allowedTypes = check.type && [].concat(check.type);
if (allowedTypes && allowedTypes.indexOf(type) === -1) {

@@ -78,3 +76,3 @@ throw LusterConfigurationError.createError(

property: path,
type: type,
type,
expected: allowedTypes.join(' or ')

@@ -91,15 +89,13 @@ });

function checkConfiguration(conf) {
var failedChecks = 0;
let failedChecks = 0;
Object
.keys(CHECKS)
.forEach(function(path) {
// @todo avoid try..catch
try {
checkProperty(path, get(conf, path), CHECKS[path]);
} catch (error) {
LusterConfigurationError.ensureError(error).log();
++failedChecks;
}
});
for (const path of Object.keys(CHECKS)) {
// @todo avoid try..catch
try {
checkProperty(path, get(conf, path), CHECKS[path]);
} catch (error) {
LusterConfigurationError.ensureError(error).log();
++failedChecks;
}
}

@@ -106,0 +102,0 @@ return failedChecks;

@@ -1,2 +0,2 @@

var util = require('util'),
const util = require('util'),
LusterConfigurationError = require('../errors').LusterConfigurationError;

@@ -9,3 +9,3 @@

function typeOf(value) {
var type = typeof value;
let type = typeof value;

@@ -33,12 +33,11 @@ if (type === 'object') {

function set(context, path, value) {
var ctx = context,
props = path.split('.'),
let ctx = context;
const props = path.split('.'),
target = props.pop(),
i, size,
propName,
type;
size = props.length;
for (i = 0, size = props.length; i < size; i++) {
propName = props[i];
type = typeOf(ctx[propName]);
for (let i = 0; i < size; i++) {
const propName = props[i];
const type = typeOf(ctx[propName]);

@@ -71,8 +70,8 @@ if (type === 'undefined') {

var props = path.split('.'),
prop = props[0],
i, size,
ctx = context;
const props = path.split('.'),
size = props.length;
for (i = 0, size = props.length; i < size; prop = props[++i]) {
let ctx = context;
for (let i = 0, prop = props[0]; i < size; prop = props[++i]) {
if (typeof ctx === 'undefined' || ctx === null ||

@@ -99,8 +98,8 @@ ! Object.prototype.hasOwnProperty.call(ctx, prop)) {

var props = path.split('.'),
prop = props[0],
i, size,
ctx = context;
const props = path.split('.'),
size = props.length;
for (i = 0, size = props.length; i < size; prop = props[++i]) {
let ctx = context;
for (let i = 0, prop = props[0]; i < size; prop = props[++i]) {
if (typeof ctx === 'undefined' || ctx === null ||

@@ -118,6 +117,6 @@ ! Object.prototype.hasOwnProperty.call(ctx, prop)) {

module.exports = {
typeOf: typeOf,
has: has,
get: get,
set: set,
typeOf,
has,
get,
set,
};

@@ -1,2 +0,2 @@

var path = require('path'),
const path = require('path'),
typeOf = require('./helpers').typeOf,

@@ -11,174 +11,193 @@ get = require('./helpers').get,

*/
function Configuration(config, basedir) {
/** @private */
this._resolveBaseDir = basedir || process.cwd();
class Configuration {
constructor(config, basedir) {
/** @private */
this._resolveBaseDir = basedir || process.cwd();
Object
.keys(config)
.forEach(function(propName) {
this[propName] = config[propName];
}, this);
}
if (config instanceof Configuration) {
config = config._rawConfig;
}
this._rawConfig = {};
Object.assign(this._rawConfig, config);
}
/**
* @param {String} path
* @param {*} [defaultValue]
* @returns {*}
* @see get
* @public
*/
Configuration.prototype.get = function(path, defaultValue) {
return get(this, path, defaultValue);
};
get raw() {
return this._rawConfig;
}
/**
* @param {String} path
* @returns {Boolean}
* @see has
* @public
*/
Configuration.prototype.has = function(path) {
return has(this, path);
};
/**
* @param {String} path
* @param {*} [defaultValue]
* @returns {*}
* @see get
* @public
*/
get(path, defaultValue) {
return get(this._rawConfig, path, defaultValue);
}
/**
* Shortcut for `Object.keys(c.get('some.obj.prop', {}))`
* @param {String} [path]
* @returns {String[]} keys of object property by path or
* empty array if property doesn't exists or not an object
* @public
*/
Configuration.prototype.getKeys = function(path) {
var obj = get(this, path);
/**
* @param {String} path
* @returns {Boolean}
* @see has
* @public
*/
has(path) {
return has(this._rawConfig, path);
}
if (typeOf(obj) !== 'object') {
return [];
} else {
return Object.keys(obj);
/**
* Shortcut for `Object.keys(c.get('some.obj.prop', {}))`
* @param {String} [path]
* @returns {String[]} keys of object property by path or
* empty array if property doesn't exists or not an object
* @public
*/
getKeys(path) {
const obj = get(this._rawConfig, path);
if (typeOf(obj) !== 'object') {
return [];
} else {
return Object.keys(obj);
}
}
};
/**
* Shortcut for `path.resolve(process.cwd(), c.get(path, 'default.file'))`
* @param {String} propPath
* @param {String} [defaultPath]
* @returns {String} absolute path
* @public
*/
Configuration.prototype.resolve = function(propPath, defaultPath) {
return path.resolve(
this._resolveBaseDir,
get(this, propPath, defaultPath));
};
/**
* Shortcut for `path.resolve(process.cwd(), c.get(path, 'default.file'))`
* @param {String} propPath
* @param {String} [defaultPath]
* @returns {String} absolute path
* @public
*/
resolve(propPath, defaultPath) {
return path.resolve(
this._resolveBaseDir,
get(this._rawConfig, propPath, defaultPath));
}
/**
* @returns {String} base dir used in `resolve()`
* @public
*/
Configuration.prototype.getBaseDir = function() {
return this._resolveBaseDir;
};
/**
* @returns {String} base dir used in `resolve()`
* @public
*/
getBaseDir() {
return this._resolveBaseDir;
}
/**
* Create Configuration instance from plain object
* @param {Object|*} config
* @param {String} basedir - base dir for `resolve` method
* @returns {Configuration|*} Configuration instance if `config` is object or `config` itself in other case
* @public
* @static
*/
Configuration.extend = function(config, basedir) {
return typeOf(config) === 'object' ?
new Configuration(config, basedir) :
config;
};
/**
* Create Configuration instance from plain object
* @param {Object|*} config
* @param {String} basedir - base dir for `resolve` method
* @returns {Configuration|*} Configuration instance if `config` is object or `config` itself in other case
* @public
* @static
*/
static extend(config, basedir) {
return typeOf(config) === 'object' ?
new Configuration(config, basedir) :
config;
}
/**
* Override config properties using `LUSTER_CONF` environment variable.
*
* @description
* LUSTER_CONF='PATH=VALUE;...'
*
* ; – properties separator;
* = – property and value separator;
* PATH – property path;
* VALUE – property value, JSON.parse applied to it,
* if JSON.parse failed, then value used as string.
* You MUST quote a string if it contains semicolon.
*
* Spaces between PATH, "=", ";" and VALUE are insignificant.
*
* @example
* LUSTER_CONF='server.port=8080'
* # { server: { port: 8080 } }
*
* LUSTER_CONF='app=./worker_debug.js; workers=1'
* # { app: "./worker_debug.js", workers : 1 }
*
* LUSTER_CONF='logStream='
* # remove option "logStream"
*
* LUSTER_CONF='server={"port":8080}'
* # { server: { port: 8080 } }
*
* @param {Object} config
* @throws {LusterConfigurationError} if you trying to
* set property of atomic property, for example,
* error will be thrown if you have property
* `extensions.sample.x = 10` in the configuration and
* environment variable `LUSTER_EXTENSIONS_SAMPLE_X_Y=5`
*/
Configuration.applyEnvironment = function(config) {
if ( ! process.env.LUSTER_CONF) {
return;
extend(config) {
return new Configuration(
{
...this.raw,
...config,
},
this._resolveBaseDir,
);
}
function parseProp(prop) {
var delimeterPos = prop.indexOf('='),
propPath,
propValue;
if (delimeterPos === 0 || delimeterPos === -1) {
/**
* Override config properties using `LUSTER_CONF` environment variable.
*
* @description
* LUSTER_CONF='PATH=VALUE;...'
*
* ; – properties separator;
* = – property and value separator;
* PATH – property path;
* VALUE – property value, JSON.parse applied to it,
* if JSON.parse failed, then value used as string.
* You MUST quote a string if it contains semicolon.
*
* Spaces between PATH, "=", ";" and VALUE are insignificant.
*
* @example
* LUSTER_CONF='server.port=8080'
* # { server: { port: 8080 } }
*
* LUSTER_CONF='app=./worker_debug.js; workers=1'
* # { app: "./worker_debug.js", workers : 1 }
*
* LUSTER_CONF='logStream='
* # remove option "logStream"
*
* LUSTER_CONF='server={"port":8080}'
* # { server: { port: 8080 } }
*
* @param {Object} config
* @throws {LusterConfigurationError} if you trying to
* set property of atomic property, for example,
* error will be thrown if you have property
* `extensions.sample.x = 10` in the configuration and
* environment variable `LUSTER_EXTENSIONS_SAMPLE_X_Y=5`
*/
static applyEnvironment(config) {
if (!process.env.LUSTER_CONF) {
return;
}
propPath = prop.substr(0, delimeterPos).trim();
propValue = prop.substr(delimeterPos + 1).trim();
if (config instanceof Configuration) {
config = config._rawConfig;
}
if (propValue === '') {
propValue = undefined;
} else {
try {
// try to parse propValue as JSON,
// if parsing failed use raw `propValue` as string
propValue = JSON.parse(propValue);
} catch(error) { // eslint-disable-line no-empty
function parseProp(prop) {
const delimeterPos = prop.indexOf('=');
if (delimeterPos === 0 || delimeterPos === -1) {
return;
}
const propPath = prop.substr(0, delimeterPos).trim();
let propValue = prop.substr(delimeterPos + 1).trim();
if (propValue === '') {
propValue = undefined;
} else {
try {
// try to parse propValue as JSON,
// if parsing failed use raw `propValue` as string
propValue = JSON.parse(propValue);
} catch (error) { // eslint-disable-line no-empty
}
}
set(config, propPath, propValue);
}
set(config, propPath, propValue);
}
const conf = process.env.LUSTER_CONF;
var conf = process.env.LUSTER_CONF,
lastSeparator = -1,
i = 0,
openQuote = false;
let lastSeparator = -1,
i = 0,
openQuote = false;
while (conf.length > i++) {
switch (conf[i]) {
case '"' :
openQuote = ! openQuote;
break;
case ';' :
if ( ! openQuote) {
parseProp(conf.substring(lastSeparator + 1, i));
lastSeparator = i;
while (conf.length > i++) {
switch (conf[i]) {
case '"' :
openQuote = !openQuote;
break;
case ';' :
if (!openQuote) {
parseProp(conf.substring(lastSeparator + 1, i));
lastSeparator = i;
}
}
}
}
if (lastSeparator < conf.length) {
parseProp(conf.substring(lastSeparator + 1));
if (lastSeparator < conf.length) {
parseProp(conf.substring(lastSeparator + 1));
}
}
};
}

@@ -185,0 +204,0 @@ Configuration.check = require('./check');

@@ -1,2 +0,2 @@

var Terror = require('terror'),
const Terror = require('terror'),
errors = {};

@@ -9,3 +9,3 @@

*/
var LusterError = errors.LusterError = Terror.create('LusterError',
const LusterError = errors.LusterError = Terror.create('LusterError',
{

@@ -98,2 +98,13 @@ ABSTRACT_METHOD_IS_NOT_IMPLEMENTED:

/**
* @constructor
* @class LusterMasterError
* @augments LusterError
*/
errors.LusterMasterError = LusterError.create('LusterMasterError',
{
POOL_KEY_ALREADY_TAKEN:
'Pool key "%key%" is already taken'
});
module.exports = errors;

@@ -1,12 +0,10 @@

var Objex = require('objex'),
util = require('util'),
EventEmitter = require('events').EventEmitter,
const util = require('util'),
{ EventEmitter } = require('events').EventEmitter;
/**
* @constructor
* @class EventEmitterEx
* @augments EventEmitter
* @augments Objex
*/
EventEmitterEx = Objex.wrap(EventEmitter).create();
/**
* @constructor
* @class EventEmitterEx
* @augments EventEmitter
*/
class EventEmitterEx extends EventEmitter {}

@@ -19,10 +17,10 @@ function inspect(val) {

if (process.env.NODE_DEBUG && /luster:eex/i.test(process.env.NODE_DEBUG)) {
EventEmitterEx.prototype.emit = function() {
var args = Array.prototype
.slice.call(arguments, 0)
.map(inspect);
EventEmitterEx.prototype.emit = function(...args) {
const inspectedArgs = args.map(inspect).join(', ');
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', this.wid, args.join(', '));
const key = this.eexKey;
return EventEmitterEx.__super.prototype.emit.apply(this, arguments);
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', key, inspectedArgs);
return EventEmitter.prototype.emit.apply(this, args);
};

@@ -29,0 +27,0 @@ }

@@ -1,2 +0,2 @@

var cluster = require('cluster'),
const cluster = require('cluster'),
/** @type {ClusterProcess} */

@@ -3,0 +3,0 @@ Proc = require(cluster.isMaster ? './master' : './worker');

@@ -1,10 +0,11 @@

var os = require('os'),
cluster = require('cluster'),
extend = require('extend'),
ClusterProcess = require('./cluster_process'),
WorkerWrapper = require('./worker_wrapper'),
Port = require('./port'),
RestartQueue = require('./restart_queue'),
Master;
const cluster = require('cluster');
const ClusterProcess = require('./cluster_process');
const LusterMasterError = require('./errors').LusterMasterError;
const RPC = require('./rpc');
const WorkerPool = require('./worker-pool');
const WorkerWrapper = require('./worker_wrapper');
const DEFAULT_POOL_KEY = '__default';
/**

@@ -15,453 +16,363 @@ * @constructor

*/
Master = ClusterProcess.create(function Master() {
Master.__super.apply(this, arguments);
class Master extends ClusterProcess {
constructor() {
super();
/**
* Configuration object to pass to cluster.setupMaster()
* @type {Object}
* @private
*/
this._masterOpts = {};
this.pools = new Map();
this.createPool(DEFAULT_POOL_KEY);
this.id = 0;
this.wid = 0;
this.eexKey = 0;
this.pid = process.pid;
// @todo make it optional?
process.on('SIGINT', this._onSignalQuit.bind(this));
process.on('SIGQUIT', this._onSignalQuit.bind(this));
}
createPool(key) {
if (this.pools.has(key)) {
throw LusterMasterError.createError(
LusterMasterError.CODES.POOL_KEY_ALREADY_TAKEN,
{key}
);
}
this.emit('create pool', key);
const pool = new WorkerPool(key, this);
this._proxyWorkerEvents(pool);
pool.on('shutdown', this._checkPoolsAlive.bind(this));
this.pools.set(key, pool);
return pool;
}
getPool(key) {
return this.pools.get(key);
}
/**
* @type {Object}
* @property {WorkerWrapper} *
* @public
* @todo make it private or public immutable
* Allows same object structure as cluster.setupMaster().
* This function must be used instead of cluster.setupMaster(),
* because all calls of cluster.setupMaster() ignored, except first one.
* An instance of Master will call it, when running.
* @param {Object} opts
* @see {@link http://nodejs.org/api/cluster.html#cluster_cluster_setupmaster_settings}
*/
this.workers = {};
setup(opts) {
Object.assign(this._masterOpts, opts);
}
/**
* Workers restart queue.
* @type {RestartQueue}
* SIGINT and SIGQUIT handler
* @private
*/
this._restartQueue = new RestartQueue();
_onSignalQuit() {
this
.once('shutdown', () => process.exit(0))
.shutdown();
}
/**
* Configuration object to pass to cluster.setupMaster()
* @type {Object}
* Check for alive workers, if no one here, then emit "shutdown".
* @private
*/
this._masterOpts = {};
_checkPoolsAlive() {
let dead = true;
this.forEachPool(pool => dead = dead && pool.dead);
this.id = 0;
this.wid = 0;
this.pid = process.pid;
if (dead) {
this.emit('shutdown');
}
}
this.on('worker state', this._cleanupUnixSockets.bind(this));
this.on('worker exit', this._checkWorkersAlive.bind(this));
/**
* Repeat WorkerWrapper events from WorkerPool on Master
* so for example 'online' became 'worker online'
* @private
* @param {WorkerPool} pool
*/
_proxyWorkerEvents(pool) {
for (const eventName of WorkerWrapper.EVENTS) {
const proxyEventName = 'worker ' + eventName;
pool.on(proxyEventName, this.emit.bind(this, proxyEventName));
}
}
// @todo make it optional?
process.on('SIGINT', this._onSignalQuit.bind(this));
process.on('SIGQUIT', this._onSignalQuit.bind(this));
});
/**
* @returns {number[]} workers ids array
*/
getWorkersIds() {
return this.getWorkersArray().map(w => w.wid);
}
/**
* Allows same object structure as cluster.setupMaster().
* This function must be used instead of cluster.setupMaster(),
* because all calls of cluster.setupMaster() ignored, except first one.
* An instance of Master will call it, when running.
* @param {Object} opts
* @see {@link http://nodejs.org/api/cluster.html#cluster_cluster_setupmaster_settings}
*/
Master.prototype.setup = function(opts) {
extend(this._masterOpts, opts);
};
/**
* @returns {WorkerWrapper[]} workers array
*/
getWorkersArray() {
let result = [];
this.forEachPool(
pool => result = result.concat(pool.getWorkersArray())
);
return result;
}
/**
* SIGINT and SIGQUIT handler
* @private
*/
Master.prototype._onSignalQuit = function() {
this
.once('shutdown', function() {
process.exit(0);
})
.shutdown();
};
/**
* Remove not used unix socket before worker will try to listen it.
* @param {WorkerWrapper} worker
* @param {WorkerWrapperState} state
* @private
*/
Master.prototype._cleanupUnixSockets = function(worker, state) {
var port = worker.options.port;
if (this._restartQueue.has(worker) ||
state !== WorkerWrapper.STATES.LAUNCHING ||
port.family !== Port.UNIX) {
return;
forEachPool(fn) {
for (const pool of this.pools.values()) {
fn(pool);
}
}
var self = this,
inUse = this.getWorkersArray().some(function(w) {
return worker.wid !== w.wid &&
w.isRunning() &&
port.isEqualTo(w.options.port);
/**
* Iterate over workers in the pool.
* @param {Function} fn
* @public
* @returns {Master} self
*
* @description Shortcut for:
* master.getWorkersArray().forEach(fn);
*/
forEach(fn) {
this.forEachPool(pool => {
pool.forEach(fn);
});
if ( ! inUse) {
port.unlink(function(err) {
if (err) {
self.emit('error', err);
}
});
return this;
}
};
/**
* Check for alive workers, if no one here, then emit "shutdown".
* @private
*/
Master.prototype._checkWorkersAlive = function() {
var workers = this.getWorkersArray(),
alive = workers.reduce(function(count, w) {
return w.dead ? count - 1 : count;
}, workers.length);
if (alive === 0) {
this.emit('shutdown');
/**
* Broadcast an event received by IPC from worker as 'worker <event>'.
* @param {WorkerWrapper} worker
* @param {String} event
* @param {...*} args
*/
broadcastWorkerEvent(worker, event, ...args) {
this.emit('received worker ' + event, worker, ...args);
}
};
/**
* Repeat WorkerWrapper events on Master and add 'worker ' prefix to event names
* so for example 'online' became 'worker online'
* @private
* @param {WorkerWrapper} worker
*/
Master.prototype._proxyWorkerEvents = function(worker) {
WorkerWrapper.EVENTS
.forEach(function(eventName) {
worker.on(eventName, this.emit.bind(this, 'worker ' + eventName, worker));
}, this);
};
/**
* Configure cluster
* @override ClusterProcess
* @private
*/
_onConfigured() {
super._onConfigured();
/**
* @returns {WorkerWrapper[]} workers array
*/
Master.prototype.getWorkersArray = function() {
if ( ! this._workersArrayCache) {
this._workersArrayCache = Object.keys(this.workers).map(function(wid) {
return this.workers[wid];
}, this);
// register global remote command in the context of master to receive events from master
if (!this.hasRegisteredRemoteCommand(RPC.fns.master.broadcastWorkerEvent)) {
this.registerRemoteCommand(
RPC.fns.master.broadcastWorkerEvent,
this.broadcastWorkerEvent.bind(this)
);
}
this.pools.get(DEFAULT_POOL_KEY).configure(this.config);
}
return this._workersArrayCache;
};
/**
* @param {Number[]} wids Array of `WorkerWrapper#wid` values
* @param {String} event wait for
* @public
* @returns {Promise<void>}
*/
waitForWorkers(wids, event) {
const promises = [];
this.forEachPool(
pool => promises.push(pool.waitForWorkers(wids, event))
);
/**
* Add worker to the pool
* @param {WorkerWrapper} worker
* @returns {Master} self
* @public
*/
Master.prototype.add = function(worker) {
// invalidate Master#getWorkersArray cache
this._workersArrayCache = null;
return Promise.all(promises);
}
this.workers[worker.wid] = worker;
this._proxyWorkerEvents(worker);
/**
* @param {String} event wait for
* @public
* @returns {Promise<void>}
*/
waitForAllWorkers(event) {
return this.waitForWorkers(
this.getWorkersIds(),
event
);
}
return this;
};
/**
* @event Master#running
*/
/**
* Iterate over workers in the pool.
* @param {Function} fn
* @public
* @returns {Master} self
*
* @description Shortcut for:
* master.getWorkersArray().forEach(function(worker) {
* // fn
* }, master);
*/
Master.prototype.forEach = function(fn) {
this.getWorkersArray().forEach(function(worker) {
fn.call(this, worker);
}, this);
/**
* @event Master#restarted
*/
return this;
};
async _restart() {
// TODO maybe run this after starting waitForAllWorkers
this.forEachPool(pool => pool.restart());
/**
* Configure cluster
* @override ClusterProcess
* @private
*/
Master.prototype._onConfigured = function() {
Master.__super.prototype._onConfigured.apply(this, arguments);
await this.waitForAllWorkers('worker ready');
var // WorkerWrapper options
forkTimeout = this.config.get('control.forkTimeout'),
stopTimeout = this.config.get('control.stopTimeout'),
exitThreshold = this.config.get('control.exitThreshold'),
allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'),
this.emit('restarted');
}
// workers and groups count
i = 0,
count = this.config.get('workers', os.cpus().length),
isServerPortSet = this.config.has('server.port'),
port,
groups = this.config.get('server.groups', 1),
group = 0,
workersPerGroup = Math.floor(count / groups),
workersInGroup = 0;
if (isServerPortSet) {
port = new Port(this.config.get('server.port'));
/**
* Hard workers restart: all workers will be restarted at same time.
* CAUTION: if dead worker is restarted, it will emit 'error' event.
* @public
* @returns {Master} self
* @fires Master#restarted when workers spawned and ready.
*/
restart() {
this._restart();
return this;
}
// create pool of workers
while (count > i++) {
this.add(new WorkerWrapper(this, {
forkTimeout: forkTimeout,
stopTimeout: stopTimeout,
exitThreshold: exitThreshold,
allowedSequentialDeaths: allowedSequentialDeaths,
port: isServerPortSet ? port.next(group) : 0,
maxListeners: this.getMaxListeners(),
}));
async _softRestart() {
const promises = [];
this.forEachPool(
pool => promises.push(pool.softRestart())
);
// groups > 1, current group is full and
// last workers can form at least more one group
if (groups > 1 &&
++workersInGroup >= workersPerGroup &&
count - (group + 1) * workersPerGroup >= workersPerGroup) {
workersInGroup = 0;
group++;
}
await Promise.all(promises);
this.emit('restarted');
}
};
/**
* @param {Number[]} wids Array of `WorkerWrapper#wid` values
* @param {String} event wait for
* @param {Function} callback
* @public
* @returns {Master} self
*/
Master.prototype.waitForWorkers = function(wids, event, callback) {
var self = this;
/**
* Workers will be restarted one by one using RestartQueue.
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed
* into the queue, it will emit 'error' on restart.
* @public
* @returns {Master} self
* @fires Master#restarted when workers spawned and ready.
*/
softRestart() {
this._softRestart();
return this;
}
function onWorkerState(worker) {
var idx = wids.indexOf(worker.wid);
/**
* @override
* @see ClusterProcess
* @private
*/
_setupIPCMessagesHandler() {
this.on('worker message', this._onMessage.bind(this));
}
if (idx > -1) {
wids.splice(idx, 1);
}
/**
* RPC to all workers
* @method
* @param {String} name of called command in the worker
* @param {...*} args
* @public
*/
remoteCallToAll(name, ...args) {
this.forEachPool(pool => pool.remoteCallToAll(name, ...args));
}
if (wids.length === 0) {
self.removeListener(event, onWorkerState);
callback.call(self);
}
/**
* Broadcast event to all workers.
* @method
* @param {String} event of called command in the worker
* @param {...*} args
* @public
*/
broadcastEventToAll(event, ...args) {
this.forEachPool(pool => pool.broadcastEventToAll(event, ...args));
}
if (wids.length > 0) {
this.on(event, onWorkerState);
} else {
setImmediate(callback.bind(self));
/**
* Emit event on master and all workers in "ready" state.
* @method
* @param {String} event of called command in the worker
* @param {...*} args
* @public
*/
emitToAll(event, ...args) {
this.forEachPool(pool => pool.emitToAll(event, ...args));
}
return this;
};
/**
* @event Master#shutdown
*/
/**
* @event Master#running
*/
async _shutdown() {
const promises = [];
this.forEachPool(
pool => promises.push(pool._shutdown())
);
/**
* Fork workers.
* Execution will be delayed until Master became configured
* (`configured` event fired).
* @method
* @returns {Master} self
* @public
* @fires Master#running then workers spawned and ready.
*
* @example
* // file: master.js
* var master = require('luster');
*
* master
* .configure({ app : 'worker' })
* .run();
*
* // there is master is still not running anyway
* // it will run immediate once configured and
* // current thread execution done
*/
Master.prototype.run = Master.whenInitialized(function() {
cluster.setupMaster(this._masterOpts);
await Promise.all(promises);
this.waitForWorkers(
this.getWorkersArray().map(function(worker) {
worker.run();
this.emit('shutdown');
}
return worker.wid;
}),
'worker ready',
function() {
this.emit('running');
});
/**
* Stop all workers and emit `Master#shutdown` event after successful shutdown of all workers.
* @fires Master#shutdown
* @returns {Master}
*/
shutdown() {
this._shutdown();
return this;
}
return this;
});
/**
* Do a remote call to all workers, callbacks are registered and then executed separately for each worker
* @method
* @param {String} opts.command
* @param {Function} opts.callback
* @param {Number} [opts.timeout] in milliseconds
* @param {*} [opts.data]
* @public
*/
remoteCallToAllWithCallback(opts) {
this.forEachPool(pool => pool.remoteCallToAllWithCallback(opts));
}
/**
* @event Master#restarted
*/
async _run() {
await this.whenInitialized();
/**
* Hard workers restart: all workers will be restarted at same time.
* CAUTION: if dead worker is restarted, it will emit 'error' event.
* @public
* @returns {Master} self
* @fires Master#restarted when workers spawned and ready.
*/
Master.prototype.restart = function() {
this.waitForWorkers(
this.getWorkersArray().map(function(worker) {
worker.restart();
cluster.setupMaster(this._masterOpts);
return worker.wid;
}),
'worker ready',
function() {
this.emit('restarted');
});
// TODO maybe run this after starting waitForAllWorkers
this.forEachPool(pool => pool.run());
return this;
};
await this.waitForAllWorkers('worker ready');
/**
* Workers will be restarted one by one using RestartQueue.
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed
* into the queue, it will emit 'error' on restart.
* @public
* @returns {Master} self
* @fires Master#restarted when workers spawned and ready.
*/
Master.prototype.softRestart = function() {
this.forEach(function(worker) {
worker.softRestart();
});
this._restartQueue.once('drain', this.emit.bind(this, 'restarted'));
return this;
};
this.emit('running');
}
/**
* Schedules one worker restart using RestartQueue.
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed
* into the queue, it will emit 'error' on restart.
* @public
* @param {WorkerWrapper} worker
* @returns {Master} self
*/
Master.prototype.scheduleWorkerRestart = function(worker) {
this._restartQueue.push(worker);
return this;
};
/**
* Fork workers.
* Execution will be delayed until Master became configured
* (`configured` event fired).
* @method
* @returns {Master} self
* @public
* @fires Master#running then workers spawned and ready.
*
* @example
* // file: master.js
* var master = require('luster');
*
* master
* .configure({ app : 'worker' })
* .run();
*
* // there is master is still not running anyway
* // it will run immediate once configured and
* // current thread execution done
*/
run() {
this._run();
return this;
}
}
/**
* @override
* @see ClusterProcess
* @private
*/
Master.prototype._setupIPCMessagesHandler = function() {
this.on('worker message', this._onMessage.bind(this));
};
/**
* RPC to all workers
* @method
* @param {String} name of called command in the worker
* @param {*} ...args
* @public
*/
Master.prototype.remoteCallToAll = function(name) { // eslint-disable-line no-unused-vars
var args = Array.prototype.slice.call(arguments, 0);
this.forEach(function(worker) {
if (worker.ready) {
worker.remoteCall.apply(worker, args);
} else {
worker.on('ready', function() {
worker.remoteCall.apply(worker, args);
}.bind(worker, args));
}
});
};
/**
* Broadcast event to all workers.
* @method
* @param {String} event of called command in the worker
* @param {*} ...args
* @public
*/
Master.prototype.broadcastEventToAll = function() {
var args = Array.prototype.slice.call(arguments, 0);
this.forEach(function(worker) {
if (worker.ready) {
worker.broadcastEvent.apply(worker, args);
}
});
};
/**
* Emit event on master and all workers in "ready" state.
* @method
* @param {String} event of called command in the worker
* @param {*} ...args
* @public
*/
Master.prototype.emitToAll = function() {
this.emit.apply(this, arguments);
this.broadcastEventToAll.apply(this, arguments);
};
/**
* @event Master#shutdown
*/
/**
* Stop all workers and emit `Master#shutdown` event after successful shutdown of all workers.
* @fires Master#shutdown
* @returns {Master}
*/
Master.prototype.shutdown = function() {
var stoppedWorkers = [];
this.forEach(function(worker) {
if (worker.isRunning()) {
worker.stop();
stoppedWorkers.push(worker.wid);
}
});
this.waitForWorkers(
stoppedWorkers,
'worker exit',
function() {
this.emit('shutdown');
});
return this;
};
/**
* Do a remote call to all workers, callbacks are registered and then executed separately for each worker
* @method
* @param {String} opts.command
* @param {Function} opts.callback
* @param {Number} [opts.timeout] in milliseconds
* @param {*} [opts.data]
* @public
*/
Master.prototype.remoteCallToAllWithCallback = function(opts) {
this.forEach(function(worker) {
if (worker.isRunning()) {
worker.remoteCallWithCallback(opts);
}
});
};
module.exports = Master;

@@ -1,2 +0,2 @@

var fs = require('fs'),
const fs = require('fs'),
LusterPortError = require('./errors').LusterPortError,

@@ -19,4 +19,6 @@ UNIX_SOCKET_MASK = '*';

*/
function Port(value) {
this.value = value;
class Port {
constructor(value) {
this.value = value;
}

@@ -29,102 +31,100 @@ /**

*/
Object.defineProperty(this, 'family', {
get: function() {
return isUnixSocket(this.value) ? Port.UNIX : Port.INET;
}
});
}
/**
* @property {String} UNIX
* @memberOf {Port}
* @readonly
*/
/**
* @property {String} INET
* @memberOf {Port}
* @readonly
*/
['UNIX', 'INET'].forEach(function(family) {
Object.defineProperty(Port, family, {
value: family.toLowerCase(),
enumerable: true
});
});
/**
* @param {*} port
* @returns {Boolean}
* @public
*/
Port.prototype.isEqualTo = function(port) {
if ( ! (port instanceof Port)) {
return false;
get family() {
return isUnixSocket(this.value) ? Port.UNIX : Port.INET;
}
return this.value === port.value;
};
/**
* @param {*} port
* @returns {Boolean}
* @public
*/
isEqualTo(port) {
if (!(port instanceof Port)) {
return false;
}
/**
* @param {Number|String} [it=1]
* @returns {Port}
* @public
*/
Port.prototype.next = function(it) {
if (typeof it === 'undefined') {
it = 1;
return this.value === port.value;
}
var newVal = this.family === Port.UNIX ?
this.value.replace(UNIX_SOCKET_MASK, it.toString()) :
Number(this.value) + it;
/**
* @param {Number|String} [it=1]
* @returns {Port}
* @public
*/
next(it) {
if (typeof it === 'undefined') {
it = 1;
}
return new Port(newVal);
};
const newVal = this.family === Port.UNIX ?
this.value.replace(UNIX_SOCKET_MASK, it.toString()) :
Number(this.value) + it;
/**
* @param {Error} [err]
* @param {Function} cb
*/
Port.prototype.unlink = function(err, cb) {
if ( ! cb && typeof err === 'function') {
cb = err;
err = undefined;
return new Port(newVal);
}
if (err) {
cb(LusterPortError
.createError(LusterPortError.CODES.UNKNOWN_ERROR, err));
return;
}
/**
* @param {Error} [err]
* @param {Function} cb
*/
unlink(err, cb) {
if (!cb && typeof err === 'function') {
cb = err;
err = undefined;
}
var value = this.value;
if (err) {
cb(LusterPortError
.createError(LusterPortError.CODES.UNKNOWN_ERROR, err));
return;
}
if (this.family !== Port.UNIX) {
cb(LusterPortError
.createError(LusterPortError.CODES.NOT_UNIX_SOCKET)
.bind({ value: value }));
return;
}
const value = this.value;
fs.unlink(value, function(err) {
if (err && err.code !== 'ENOENT') {
if (this.family !== Port.UNIX) {
cb(LusterPortError
.createError(LusterPortError.CODES.CAN_NOT_UNLINK_UNIX_SOCKET, err)
.bind({ socketPath: value }));
.createError(LusterPortError.CODES.NOT_UNIX_SOCKET)
.bind({value}));
return;
}
cb();
});
};
fs.unlink(value, err => {
if (err && err.code !== 'ENOENT') {
cb(LusterPortError
.createError(LusterPortError.CODES.CAN_NOT_UNLINK_UNIX_SOCKET, err)
.bind({socketPath: value}));
return;
}
Port.prototype.toString = function() {
return this.value;
};
cb();
});
}
Port.prototype.valueOf = function() {
return this.value;
};
toString() {
return this.value;
}
valueOf() {
return this.value;
}
/**
* @property {String} UNIX
* @memberOf {Port}
* @readonly
*/
static get UNIX() {
return 'unix';
}
/**
* @property {String} INET
* @memberOf {Port}
* @readonly
*/
static get INET() {
return 'inet';
}
}
module.exports = Port;

@@ -1,4 +0,3 @@

var EventEmitterEx = require('./event_emitter_ex'),
WorkerWrapper = require('./worker_wrapper'),
RestartQueue;
const EventEmitterEx = require('./event_emitter_ex'),
WorkerWrapper = require('./worker_wrapper');

@@ -18,76 +17,82 @@ /**

*/
RestartQueue = EventEmitterEx.create(function RestartQueue() {
/**
* @type {WorkerWrapper[]}
* @private
*/
this._queue = [];
});
class RestartQueue extends EventEmitterEx {
constructor(eexKey = undefined) {
super();
/**
* Adds new worker in restart queue. Does nothing if worker is already in queue.
* @public
* @param {WorkerWrapper} worker
*/
RestartQueue.prototype.push = function(worker) {
if (this.has(worker)) {
// Worker is already in queue, do nothing
return;
/**
* @type {WorkerWrapper[]}
* @private
*/
this._queue = [];
this.eexKey = eexKey;
}
var removeWorker = function() {
worker.removeListener('ready', removeWorker);
worker.removeListener('state', checkDead);
this._remove(worker);
}.bind(this);
var checkDead = function(state) {
if (state === WorkerWrapper.STATES.STOPPED && worker.dead) {
removeWorker();
/**
* Adds new worker in restart queue. Does nothing if worker is already in queue.
* @public
* @param {WorkerWrapper} worker
*/
push(worker) {
if (this.has(worker)) {
// Worker is already in queue, do nothing
return;
}
};
worker.on('ready', removeWorker);
worker.on('state', checkDead);
this._queue.push(worker);
this._process();
};
const removeWorker = () => {
worker.removeListener('ready', removeWorker);
worker.removeListener('state', checkDead);
this._remove(worker);
};
/**
* Returns true if specified worker is in this queue.
* @public
* @param {WorkerWrapper} worker
* @returns {Boolean}
*/
RestartQueue.prototype.has = function(worker) {
return this._queue.indexOf(worker) !== -1;
};
const checkDead = state => {
if (state === WorkerWrapper.STATES.STOPPED && worker.dead) {
removeWorker();
}
};
/**
* Removes specified worker from queue
* @private
* @param {WorkerWrapper} worker
*/
RestartQueue.prototype._remove = function(worker) {
var idx = this._queue.indexOf(worker);
this._queue.splice(idx, 1);
this._process();
};
worker.on('ready', removeWorker);
worker.on('state', checkDead);
this._queue.push(worker);
this._process();
}
/**
* Checks if any processing is needed. Should be called after each restart queue state change.
* @private
*/
RestartQueue.prototype._process = function() {
if (this._queue.length === 0) {
this.emit('drain');
return;
/**
* Returns true if specified worker is in this queue.
* @public
* @param {WorkerWrapper} worker
* @returns {Boolean}
*/
has(worker) {
return this._queue.indexOf(worker) !== -1;
}
var head = this._queue[0];
if (head.ready) {
head.restart();
/**
* Removes specified worker from queue
* @private
* @param {WorkerWrapper} worker
*/
_remove(worker) {
const idx = this._queue.indexOf(worker);
this._queue.splice(idx, 1);
this._process();
}
};
/**
* Checks if any processing is needed. Should be called after each restart queue state change.
* @private
*/
_process() {
if (this._queue.length === 0) {
this.emit('drain');
return;
}
const head = this._queue[0];
if (head.ready) {
head.restart();
}
}
}
module.exports = RestartQueue;

@@ -1,4 +0,4 @@

var LusterRPCCallbackError = require('./errors').LusterRPCCallbackError;
const LusterRPCCallbackError = require('./errors').LusterRPCCallbackError;
var RPCCallback = {
const RPCCallback = {
_storage: {},

@@ -15,5 +15,4 @@

*/
setCallback: function(proc, command, callback, timeout) {
var self = this,
storage = self._storage;
setCallback(proc, command, callback, timeout) {
const storage = this._storage;

@@ -24,8 +23,8 @@ if ( ! timeout) {

var callbackId = proc.wid + '_' + self._counter++;
const callbackId = proc.wid + '_' + this._counter++;
storage[callbackId] = {
callback: callback,
callback,
timeout:
setTimeout(function() {
setTimeout(() => {
storage[callbackId].callback(

@@ -35,4 +34,4 @@ proc,

LusterRPCCallbackError.CODES.REMOTE_CALL_WITH_CALLBACK_TIMEOUT,
{ command: command }));
self.removeCallback(callbackId);
{ command }));
this.removeCallback(callbackId);
}, timeout)

@@ -49,4 +48,4 @@ };

*/
processCallback: function(proc, callbackId, data) {
var stored = this._storage[callbackId];
processCallback(proc, callbackId, data) {
const stored = this._storage[callbackId];

@@ -57,5 +56,3 @@ if ( ! stored) {

setImmediate(function() {
stored.callback(proc, null, data);
});
setImmediate(() => stored.callback(proc, null, data));
this.removeCallback(callbackId);

@@ -67,4 +64,4 @@ },

*/
removeCallback: function(callbackId) {
var timeout = this._storage[callbackId].timeout;
removeCallback(callbackId) {
const timeout = this._storage[callbackId].timeout;

@@ -71,0 +68,0 @@ clearTimeout(timeout);

/**
* @type {{createCaller: Function, parseMessage: Function}}
*/
var RPC = {
const RPC = {
/**

@@ -9,8 +9,8 @@ * @param {Object} target must have `send` method

*/
createCaller: function(target) {
return function(name) {
var message = { cmd: 'luster_' + name };
createCaller(target) {
return function(name, ...args) {
const message = { cmd: 'luster_' + name };
if (arguments.length > 1) {
message.args = Array.prototype.slice.call(arguments, 1);
if (args.length > 0) {
message.args = args;
}

@@ -32,3 +32,3 @@

*/
parseMessage: function(message) {
parseMessage(message) {
if (message &&

@@ -35,0 +35,0 @@ typeof message.cmd === 'string' &&

@@ -1,2 +0,2 @@

var cluster = require('cluster'),
const cluster = require('cluster'),
RPC = require('./rpc'),

@@ -6,22 +6,14 @@ RPCCallback = require('./rpc-callback'),

Port = require('./port'),
LusterWorkerWrapperError = require('./errors').LusterWorkerWrapperError,
WorkerWrapper,
LusterWorkerWrapperError = require('./errors').LusterWorkerWrapperError;
/**
* Identifier for next constructed WorkerWrapper.
* Usage restricted to WorkerWrapper constructor only.
* @type {Number}
* @private
*/
nextId = 0;
/**
* @event WorkerWrapper#state
* @param {WorkerWrapperState} state Actual WorkerWrapper state
* @see WorkerWrapper.STATES for possible `state` values.
* Identifier for next constructed WorkerWrapper.
* Usage restricted to WorkerWrapper constructor only.
* @type {Number}
* @private
*/
let nextId = 0;
/**
* @memberOf WorkerWrapper
* @typedef WorkerWrapperOptions
* @class WorkerWrapperOptions
* @property {Number|String} port Port number or socket path which worker can listen

@@ -39,8 +31,43 @@ * @property {Boolean} [persistent=true] While `persistent === true` worker will be restarted on exit

*/
class WorkerWrapperOptions {
constructor(options) {
this.persistent = typeof options.persistent === 'undefined' ? true : options.persistent;
this.forkTimeout = options.forkTimeout;
this.stopTimeout = options.stopTimeout;
this.exitThreshold = options.exitThreshold;
this.allowedSequentialDeaths = options.allowedSequentialDeaths || 0;
this.port = options.port;
this.workerEnv = options.workerEnv;
}
get port() {
return this._port;
}
/**
* Setter of `this.options.port` affects value of the `isListeningUnixSocket` property.
* @memberOf WorkerWrapperOptions
* @property {Number|String} port
* @public
*/
set port(value) {
if (!(value instanceof Port)) {
value = new Port(value);
}
return this._port = value;
}
}
/**
* @event WorkerWrapper#state
* @param {WorkerWrapperState} state Actual WorkerWrapper state
* @see WorkerWrapper.STATES for possible `state` values.
*/
/**
* @constructor
* @class WorkerWrapper
* @augments EventEmitterEx
* @param {Master} master
* @param {WorkerPool} pool
* @param {WorkerWrapperOptions} options

@@ -53,12 +80,13 @@ *

*/
WorkerWrapper = EventEmitterEx.create(function WorkerWrapper(master, options) {
WorkerWrapper.__super.call(this);
class WorkerWrapper extends EventEmitterEx {
constructor(pool, options) {
super();
if (options &&
typeof options.maxListeners !== 'undefined' &&
options.maxListeners > this.getMaxListeners()) {
this.setMaxListeners(options.maxListeners);
}
if (options &&
typeof options.maxListeners !== 'undefined' &&
options.maxListeners > this.getMaxListeners()) {
this.setMaxListeners(options.maxListeners);
}
var /**
/**
* WorkerWrapper state. Must be set via private method `WorkerWrapper#_setState(value)`,

@@ -69,42 +97,84 @@ * not directly. Can be retrieved via `WorkerWrapper#state` getter.

*/
_state = WorkerWrapper.STATES.STOPPED,
this._state = WorkerWrapper.STATES.STOPPED;
/** @type {Number|String} */
_port;
/**
* @type WorkerWrapperOptions
* @public
* @readonly
*/
this.options = new WorkerWrapperOptions(options);
/**
* @type WorkerWrapperOptions
* @public
* @readonly
*/
this.options = {
persistent: typeof options.persistent === 'undefined' ? true : options.persistent,
forkTimeout: options.forkTimeout,
stopTimeout: options.stopTimeout,
exitThreshold: options.exitThreshold,
allowedSequentialDeaths: options.allowedSequentialDeaths || 0
};
this._wid = ++nextId;
this.eexKey = this._wid;
/**
* Setter of `this.options.port` affects value of the `isListeningUnixSocket` property.
* @memberOf WorkerWrapper#options
* @property {Number|String} port
* @public
*/
Object.defineProperty(this.options, 'port', {
get: function() {
return _port;
},
set: function(value) {
if ( ! (value instanceof Port)) {
value = new Port(value);
}
/**
* Indicates worker restarting in progress.
* Changing `restarting` property value outside WorkerWrapper and it inheritors is not recommended.
* @property {Boolean} restarting
* @memberOf {WorkerWrapper}
* @public
*/
this.restarting = false;
return _port = value;
},
enumerable: true
});
/**
* Indicates worker stopping in progress.
* Changing `stopping` property value outside WorkerWrapper and it inheritors is not recommended.
* @property {Boolean} stopping
* @memberOf {WorkerWrapper}
* @public
*/
this.stopping = false;
this.options.port = options.port;
/**
* Worker can be marked as dead on sequential fails of launch attempt.
* Dead worker will not be restarted on event WorkerWrapper#state('stopped').
* Internally in the WorkerWrapper worker can be marked as dead, but never go alive again.
* To revive the worker something outside of the WorkerWrapper
* must set the `dead` property value to `false`.
* @public
* @type {Boolean}
*/
this.dead = false;
/**
* Number of sequential deaths when worker life time was less than `exitThreshold` option value.
* @type {Number}
* @private
*/
this._sequentialDeaths = 0;
/**
* Time of the last WorkerWrapper#state('running') event.
* @type {Number}
*/
this.startTime = null;
/**
* Indicates whether ready() method was called in worker
* @public
* @type {Boolean}
*/
this.ready = false;
/**
* @type {WorkerPool}
* @private
*/
this._pool = pool;
/**
* Listen for cluster#fork and worker events.
* @see WorkerWrapper#_proxyEvents to know about repeating worker events on WorkerWrapper instance.
*/
cluster.on('fork', this._onFork.bind(this));
this.on('online', this._onOnline.bind(this));
this.on('disconnect', this._onDisconnect.bind(this));
this.on('exit', this._onExit.bind(this));
WorkerWrapper._RPC_EVENTS.forEach(event => {
pool.master.on('received worker ' + event, this.eventTranslator.bind(this, event));
});
this.on('ready', this._onReady.bind(this));
}
/**

@@ -116,6 +186,5 @@ * @property {Number} wid Persistent WorkerWrapper identifier

*/
Object.defineProperty(this, 'wid', {
value: ++nextId,
enumerable: true
});
get wid() {
return this._wid;
}

@@ -128,8 +197,5 @@ /**

*/
Object.defineProperty(this, 'pid', {
get: function() {
return this.process.pid;
},
enumerable: true
});
get pid() {
return this.process.pid;
}

@@ -144,10 +210,17 @@ /**

*/
Object.defineProperty(this, 'state', {
get: function() {
return _state;
},
enumerable: true
});
get state() {
return this._state;
}
/**
* Pool this WorkerWrapper belongs to
* @memberOf {WorkerWrapper}
* @public
* @readonly
*/
get pool() {
return this._pool;
}
/**
* @see WorkerWrapper.STATES for possible `state` argument values

@@ -158,4 +231,4 @@ * @param {WorkerWrapperState} state

*/
this._setState = function(state) {
_state = state;
_setState(state) {
this._state = state;

@@ -165,251 +238,363 @@ this['_onState' + state[0].toUpperCase() + state.slice(1)]();

this.emit('state', state);
};
}
eventTranslator(event, worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit(event);
}
}
_onReady() {
this.ready = true;
}
/**
* Indicates worker restarting in progress.
* Changing `restarting` property value outside WorkerWrapper and it inheritors is not recommended.
* @property {Boolean} restarting
* @memberOf {WorkerWrapper}
* @public
* @private
*/
this.restarting = false;
_onExit() {
this.ready = false;
this._setState(WorkerWrapper.STATES.STOPPED);
}
/**
* Indicates worker stopping in progress.
* Changing `stopping` property value outside WorkerWrapper and it inheritors is not recommended.
* @property {Boolean} stopping
* @memberOf {WorkerWrapper}
* @public
* event:_worker#disconnect handler
* @private
*/
this.stopping = false;
_onDisconnect() {
this.ready = false;
this._setState(WorkerWrapper.STATES.STOPPING);
}
/**
* Worker can be marked as dead on sequential fails of launch attempt.
* Dead worker will not be restarted on event WorkerWrapper#state('stopped').
* Internally in the WorkerWrapper worker can be marked as dead, but never go alive again.
* To revive the worker something outside of the WorkerWrapper
* must set the `dead` property value to `false`.
* @public
* @type {Boolean}
* @event WorkerWrapper#ready
*/
this.dead = false;
/**
* Number of sequential deaths when worker life time was less than `exitThreshold` option value.
* @type {Number}
* event:_worker#online handler
* @fires WorkerWrapper#ready
* @private
*/
this._sequentialDeaths = 0;
_onOnline() {
this._setState(WorkerWrapper.STATES.RUNNING);
// pass some of the {WorkerWrapper} properties to {Worker}
this.remoteCall(RPC.fns.worker.applyForeignProperties, {
pid: this.process.pid
});
}
/**
* Time of the last WorkerWrapper#state('running') event.
* @type {Number}
* event:cluster#fork handler
* @private
*/
this.startTime = null;
_onFork(worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit('fork');
this._setState(WorkerWrapper.STATES.LAUNCHING);
}
}
/**
* Indicates whether ready() method was called in worker
* @public
* @type {Boolean}
* event:state('launching') handler
* @private
*/
this.ready = false;
_onStateLaunching() {
this.restarting = false;
if (this.options.forkTimeout) {
this.launchTimeout = setTimeout(() => {
this.launchTimeout = null;
if (this._worker !== null) {
this._worker.kill();
}
}, this.options.forkTimeout);
}
}
/**
* @type {Master}
* event:state('running') handler
* @private
*/
this._master = master;
_onStateRunning() {
if (this.launchTimeout) {
clearTimeout(this.launchTimeout);
this.launchTimeout = null;
}
this.startTime = Date.now();
}
/**
* Listen for cluster#fork and worker events.
* @see WorkerWrapper#_proxyEvents to know about repeating worker events on WorkerWrapper instance.
* event:state('stopping') handler
* @private
*/
cluster.on('fork', this._onFork.bind(this));
this.on('online', this._onOnline.bind(this));
this.on('disconnect', this._onDisconnect.bind(this));
this.on('exit', this._onExit.bind(this));
// register global remote command in the context of master to receive events from master
if ( ! master.hasRegisteredRemoteCommand(RPC.fns.master.broadcastWorkerEvent)) {
master.registerRemoteCommand(
RPC.fns.master.broadcastWorkerEvent,
WorkerWrapper.broadcastWorkerEvent.bind(master)
);
_onStateStopping() {
this._scheduleForceStop();
}
WorkerWrapper._RPC_EVENTS.forEach(function(event) {
master.on('received worker ' + event, WorkerWrapper.createEventTranslator(event).bind(this));
}, this);
this.on('ready', this._onReady.bind(this));
});
/**
* event:state('stopped') handler
* @private
*/
_onStateStopped() {
this._cancelForceStop();
WorkerWrapper.createEventTranslator = function(event) {
return /** @this {WorkerWrapper} */function(worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit(event);
// increase sequential deaths count if worker life time less
// than `exitThreshold` option value (and option was passed to constructor).
if (this.options.exitThreshold &&
Date.now() - this.startTime < this.options.exitThreshold &&
!this.restarting) {
this._sequentialDeaths++;
}
};
};
WorkerWrapper.prototype._onReady = function() {
this.ready = true;
};
// mark worker as dead if too much sequential deaths
if (this._sequentialDeaths > this.options.allowedSequentialDeaths) {
this.dead = true;
}
/**
* Broadcast an event received by IPC from worker as 'worker <event>'.
* @this {Master}
* @param {WorkerWrapper} worker
* @param {String} event
*/
WorkerWrapper.broadcastWorkerEvent = function(worker, event) {
var args = ['received worker ' + event, worker],
i = 2,
len = arguments.length;
this._worker = null;
for (; i < len; i++) {
args.push(arguments[i]);
// start worker again if it persistent or in the restarting state
// and isn't marked as dead
if (((this.options.persistent && !this.stopping) || this.restarting) && !this.dead) {
setImmediate(this.run.bind(this));
}
this.stopping = false;
}
this.emit.apply(this, args);
};
/**
* @returns {Boolean}
*/
isRunning() {
return this.state === WorkerWrapper.STATES.LAUNCHING ||
this.state === WorkerWrapper.STATES.RUNNING;
}
/**
* Possible WorkerWrapper instance states.
* @property {Object} STATES
* @memberOf WorkerWrapper
* @typedef WorkerWrapperState
* @enum
* @readonly
* @public
* @static
*/
Object.defineProperty(WorkerWrapper, 'STATES', {
value: Object.freeze({
STOPPED: 'stopped',
LAUNCHING: 'launching',
RUNNING: 'running',
STOPPING: 'stopping'
}),
enumerable: true
});
/**
* @event WorkerWrapper#error
* @param {LusterError} error
*/
/**
* @private
*/
WorkerWrapper.prototype._onExit = function() {
this.ready = false;
this._setState(WorkerWrapper.STATES.STOPPED);
};
/**
* Spawn a worker
* @fires WorkerWrapper#error if worker already running
* @fires WorkerWrapper#state('launching') on success
* @returns {WorkerWrapper} self
*/
run() {
if (this.isRunning()) {
this.emit('error',
LusterWorkerWrapperError.createError(
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE,
{
wid: this.wid,
pid: this.process.pid,
state: this.state,
targetState: WorkerWrapper.STATES.LAUNCHING
}));
/**
* event:_worker#disconnect handler
* @private
*/
WorkerWrapper.prototype._onDisconnect = function() {
this.ready = false;
this._setState(WorkerWrapper.STATES.STOPPING);
};
return this;
}
/**
* @event WorkerWrapper#ready
*/
setImmediate(() => {
/** @private */
this._worker = cluster.fork(Object.assign({
port: this.options.port,
LUSTER_WID: this.wid,
}, this.options.workerEnv));
/**
* event:_worker#online handler
* @fires WorkerWrapper#ready
* @private
*/
WorkerWrapper.prototype._onOnline = function() {
this._setState(WorkerWrapper.STATES.RUNNING);
/** @private */
this._remoteCall = RPC.createCaller(this._worker);
// pass some of the {WorkerWrapper} properties to {Worker}
this.remoteCall(RPC.fns.worker.applyForeignProperties, {
pid: this.process.pid
});
};
this._proxyEvents();
});
/**
* event:cluster#fork handler
* @private
*/
WorkerWrapper.prototype._onFork = function(worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit('fork');
this._setState(WorkerWrapper.STATES.LAUNCHING);
return this;
}
};
/**
* event:state('launching') handler
* @private
*/
WorkerWrapper.prototype._onStateLaunching = function() {
var self = this;
/**
* Disconnect worker to stop it.
* @fires WorkerWrapper#error if worker status is 'stopped' or 'stopping'
* @fires WorkerWrapper#status('stopping') on success
* @returns {WorkerWrapper}
*/
stop() {
if (!this.isRunning()) {
this.emit('error',
LusterWorkerWrapperError.createError(
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE,
{
wid: this.wid,
pid: this.process.pid,
state: this.state,
targetState: WorkerWrapper.STATES.STOPPING
}));
this.restarting = false;
return this;
}
if (this.options.forkTimeout) {
this.launchTimeout = setTimeout(function() {
self.launchTimeout = null;
this.stopping = true;
if (self._worker !== null) {
self._worker.kill();
setImmediate(() => {
// state can be changed before function call
if (this.isRunning()) {
this._worker.disconnect();
this._scheduleForceStop();
}
}, this.options.forkTimeout);
});
return this;
}
};
/**
* event:state('running') handler
* @private
*/
WorkerWrapper.prototype._onStateRunning = function() {
if (this.launchTimeout) {
clearTimeout(this.launchTimeout);
this.launchTimeout = null;
/**
* Set WorkerWrapper#restarting to `true` and stop it,
* which leads to worker restart.
*/
restart() {
this.restarting = true;
this.stop();
}
this.startTime = Date.now();
};
/**
* Call Worker method via RPC
* @method
* @param {String} name of called command in the worker
* @param {...*} args
*/
remoteCall(name, ...args) {
if (this.isRunning()) {
this._remoteCall(name, ...args);
} else {
this.emit('error',
LusterWorkerWrapperError.createError(
LusterWorkerWrapperError.CODES.REMOTE_COMMAND_CALL_TO_STOPPED_WORKER,
{
wid: this.wid,
pid: this.process.pid,
command: name
}));
}
}
/**
* event:state('stopping') handler
* @private
*/
WorkerWrapper.prototype._onStateStopping = function() {
this._scheduleForceStop();
};
// proxy some properties to WorkerWrapper#_worker
/**
* event:state('stopped') handler
* @private
*/
WorkerWrapper.prototype._onStateStopped = function() {
this._cancelForceStop();
get id() {
return this._worker.id;
}
// increase sequential deaths count if worker life time less
// than `exitThreshold` option value (and option was passed to constructor).
if (this.options.exitThreshold &&
Date.now() - this.startTime < this.options.exitThreshold &&
! this.restarting) {
this._sequentialDeaths++;
get process() {
return this._worker.process;
}
// mark worker as dead if too much sequential deaths
if (this._sequentialDeaths > this.options.allowedSequentialDeaths) {
this.dead = true;
get suicide() {
return this._worker.suicide;
}
this._worker = null;
// proxy some methods to WorkerWrapper#_worker
send(...args) {
this._worker.send(...args);
}
// start worker again if it persistent or in the restarting state
// and isn't marked as dead
if (((this.options.persistent && ! this.stopping) || this.restarting) && ! this.dead) {
setImmediate(this.run.bind(this));
disconnect() {
this._worker.disconnect();
}
this.stopping = false;
};
/**
* repeat events from WorkerWrapper#_worker on WorkerWrapper
* @private
*/
_proxyEvents() {
WorkerWrapper._PROXY_EVENTS
.forEach(eventName => {
this._worker.on(eventName, this.emit.bind(this, eventName));
});
}
inspect() {
return 'WW{ id:' + this.wid + ', state: ' + this.state + '}';
}
broadcastEvent(...args) {
//TODO args here passed as single array, but remoteCall can handle multiple args
this.remoteCall(RPC.fns.worker.broadcastMasterEvent, args);
}
/**
* Do a remote call to worker, wait for worker to handle it, then execute registered callback
* @method
* @param {String} opts.command
* @param {Function} opts.callback
* @param {Number} [opts.timeout] in milliseconds
* @param {*} [opts.data]
* @public
*/
remoteCallWithCallback(opts) {
const callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout);
this.remoteCall(opts.command, opts.data, callbackId);
}
/**
* Schedule a forceful worker stop using signal.
* Only schedules timeout if it was not set yet.
* @private
*/
_scheduleForceStop() {
// We could schedule force stop either when `stop` method is called or when `disconnected` event received from
// worker. In most cases `stop` will be called and then `disconnected` event will fire, therefore we shall
// make sure we do not re-run the force stop timer.
if (this.options.stopTimeout && !this.stopTimeout) {
this.stopTimeout = setTimeout(() => {
this.stopTimeout = null;
if (this._worker !== null) {
this._worker.process.kill();
}
}, this.options.stopTimeout);
}
}
/**
* Clears a forceful worker stop.
* @private
*/
_cancelForceStop() {
if (this.stopTimeout) {
clearTimeout(this.stopTimeout);
this.stopTimeout = null;
}
}
/**
* Adds this worker to pool's restart queue
* @public
*/
softRestart() {
this._pool.scheduleWorkerRestart(this);
}
}
/**
* Possible WorkerWrapper instance states.
* @property {Object} STATES
* @memberOf WorkerWrapper
* @typedef WorkerWrapperState
* @enum
* @readonly
* @public
* @static
*/
Object.defineProperty(WorkerWrapper, 'STATES', {
value: Object.freeze({
STOPPED: 'stopped',
LAUNCHING: 'launching',
RUNNING: 'running',
STOPPING: 'stopping'
}),
enumerable: true
});
/**
* Events to repeat from WorkerWrapper#_worker on WorkerWrapper instance

@@ -422,3 +607,9 @@ * @memberOf WorkerWrapper

Object.defineProperty(WorkerWrapper, '_PROXY_EVENTS', {
value: Object.freeze(['message', 'online', 'listening', 'disconnect', 'exit']),
value: Object.freeze([
'message',
'online',
'listening',
'disconnect',
'exit'
]),
enumerable: true

@@ -460,211 +651,2 @@ });

/**
* @returns {Boolean}
*/
WorkerWrapper.prototype.isRunning = function() {
return this.state === WorkerWrapper.STATES.LAUNCHING ||
this.state === WorkerWrapper.STATES.RUNNING;
};
/**
* @event WorkerWrapper#error
* @param {LusterError} error
*/
/**
* Spawn a worker
* @fires WorkerWrapper#error if worker already running
* @fires WorkerWrapper#state('launching') on success
* @returns {WorkerWrapper} self
*/
WorkerWrapper.prototype.run = function() {
if (this.isRunning()) {
this.emit('error',
LusterWorkerWrapperError.createError(
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE,
{
wid: this.wid,
pid: this.process.pid,
state: this.state,
targetState: WorkerWrapper.STATES.LAUNCHING
}));
return this;
}
var self = this;
setImmediate(function() {
/** @private */
self._worker = cluster.fork({
port: self.options.port,
LUSTER_WID: self.wid,
});
/** @private */
self._remoteCall = RPC.createCaller(self._worker);
self._proxyEvents();
});
return this;
};
/**
* Disconnect worker to stop it.
* @fires WorkerWrapper#error if worker status is 'stopped' or 'stopping'
* @fires WorkerWrapper#status('stopping') on success
* @returns {WorkerWrapper}
*/
WorkerWrapper.prototype.stop = function() {
if ( ! this.isRunning()) {
this.emit('error',
LusterWorkerWrapperError.createError(
LusterWorkerWrapperError.CODES.INVALID_ATTEMPT_TO_CHANGE_STATE,
{
wid: this.wid,
pid: this.process.pid,
state: this.state,
targetState: WorkerWrapper.STATES.STOPPING
}));
return this;
}
this.stopping = true;
var self = this;
setImmediate(function() {
// state can be changed before function call
if (self.isRunning()) {
self._worker.disconnect();
self._scheduleForceStop();
}
});
return this;
};
/**
* Set WorkerWrapper#restarting to `true` and stop it,
* which leads to worker restart.
*/
WorkerWrapper.prototype.restart = function() {
this.restarting = true;
this.stop();
};
/**
* Call Worker method via RPC
* @method
* @param {String} name of called command in the worker
* @param {*} ...args
*/
WorkerWrapper.prototype.remoteCall = function(name) {
if (this.isRunning()) {
this._remoteCall.apply(this, arguments);
} else {
this.emit('error',
LusterWorkerWrapperError.createError(
LusterWorkerWrapperError.CODES.REMOTE_COMMAND_CALL_TO_STOPPED_WORKER,
{
wid: this.wid,
pid: this.process.pid,
command: name
}));
}
};
// proxy some properties to WorkerWrapper#_worker
['id', 'process', 'suicide'].forEach(function(propName) {
Object.defineProperty(WorkerWrapper.prototype, propName, {
get: function() {
return this._worker[propName];
},
enumerable: true
});
});
// proxy some methods to WorkerWrapper#_worker
['send', 'disconnect'].forEach(function(methodName) {
WorkerWrapper.prototype[methodName] = function() {
this._worker[methodName].apply(this._worker, arguments);
};
});
/**
* repeat events from WorkerWrapper#_worker on WorkerWrapper
* @private
*/
WorkerWrapper.prototype._proxyEvents = function() {
WorkerWrapper._PROXY_EVENTS
.forEach(function(eventName) {
this._worker.on(eventName, this.emit.bind(this, eventName));
}, this);
};
WorkerWrapper.prototype.inspect = function() {
return 'WW{ id:' + this.wid + ', state: ' + this.state + '}';
};
WorkerWrapper.prototype.broadcastEvent = function() {
this.remoteCall(RPC.fns.worker.broadcastMasterEvent, Array.prototype.slice.call(arguments, 0));
};
/**
* Do a remote call to worker, wait for worker to handle it, then execute registered callback
* @method
* @param {String} opts.command
* @param {Function} opts.callback
* @param {Number} [opts.timeout] in milliseconds
* @param {*} [opts.data]
* @public
*/
WorkerWrapper.prototype.remoteCallWithCallback = function(opts) {
var callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout);
this.remoteCall(opts.command, opts.data, callbackId);
};
/**
* Schedule a forceful worker stop using signal.
* Only schedules timeout if it was not set yet.
* @private
*/
WorkerWrapper.prototype._scheduleForceStop = function() {
// We could schedule force stop either when `stop` method is called or when `disconnected` event received from
// worker. In most cases `stop` will be called and then `disconnected` event will fire, therefore we shall
// make sure we do not re-run the force stop timer.
if (this.options.stopTimeout && ! this.stopTimeout) {
var self = this;
this.stopTimeout = setTimeout(function() {
self.stopTimeout = null;
if (self._worker !== null) {
self._worker.process.kill();
}
}, this.options.stopTimeout);
}
};
/**
* Clears a forceful worker stop.
* @private
*/
WorkerWrapper.prototype._cancelForceStop = function() {
if (this.stopTimeout) {
clearTimeout(this.stopTimeout);
this.stopTimeout = null;
}
};
/**
* Adds this worker to master's restart queue
* @public
*/
WorkerWrapper.prototype.softRestart = function() {
this._master.scheduleWorkerRestart(this);
};
module.exports = WorkerWrapper;

@@ -1,8 +0,11 @@

var cluster = require('cluster'),
const cluster = require('cluster'),
RPC = require('./rpc'),
RPCCallback = require('./rpc-callback'),
ClusterProcess = require('./cluster_process'),
LusterWorkerError = require('./errors').LusterWorkerError,
Worker;
LusterWorkerError = require('./errors').LusterWorkerError;
const pEvent = require('p-event');
const wid = parseInt(process.env.LUSTER_WID, 10);
/**

@@ -13,5 +16,25 @@ * @constructor

*/
Worker = ClusterProcess.create(function Worker() {
Worker.__super.apply(this, arguments);
class Worker extends ClusterProcess {
constructor() {
super();
this.eexKey = wid;
const broadcastEvent = this._broadcastEvent;
this._foreignPropertiesReceivedPromise = pEvent(this, 'foreign properties received');
this.on('configured', broadcastEvent.bind(this, 'configured'));
this.on('extension loaded', broadcastEvent.bind(this, 'extension loaded'));
this.on('initialized', broadcastEvent.bind(this, 'initialized'));
this.on('loaded', broadcastEvent.bind(this, 'loaded'));
this.on('ready', broadcastEvent.bind(this, 'ready'));
cluster.worker.on('disconnect', this.emit.bind(this, 'disconnect'));
this._ready = false;
this.registerRemoteCommand(RPC.fns.worker.applyForeignProperties, this.applyForeignProperties.bind(this));
this.registerRemoteCommand(RPC.fns.worker.broadcastMasterEvent, this.broadcastMasterEvent.bind(this));
}
/**

@@ -23,6 +46,5 @@ * @memberOf {Worker}

*/
Object.defineProperty(this, 'wid', {
value: parseInt(process.env.LUSTER_WID, 10),
enumerable: true
});
get wid(){
return wid;
}

@@ -36,113 +58,119 @@ /**

*/
Object.defineProperty(this, 'id', {
value: cluster.worker.id,
enumerable: true
});
get id() {
return cluster.worker.id;
}
var broadcastEvent = this._broadcastEvent;
/**
* Emit an event received from the master as 'master <event>'.
*/
broadcastMasterEvent(proc, emitArgs) {
const [eventName, ...eventArgs] = emitArgs;
this.emit('master ' + eventName, ...eventArgs);
}
this._foreignPropertiesReceived = false;
this.on('foreign properties received', function() { this._foreignPropertiesReceived = true; }.bind(this));
/**
* Transmit worker event to master, which plays as relay,
* retransmitting it as 'worker <event>' to all master-side listeners.
* @param {String} event Event name
* @param {...*} args
* @private
*/
_broadcastEvent(event, ...args) {
this.remoteCall(RPC.fns.master.broadcastWorkerEvent, event, ...args);
}
this.on('configured', broadcastEvent.bind(this, 'configured'));
this.on('extension loaded', broadcastEvent.bind(this, 'extension loaded'));
this.on('initialized', broadcastEvent.bind(this, 'initialized'));
this.on('loaded', broadcastEvent.bind(this, 'loaded'));
this.on('ready', broadcastEvent.bind(this, 'ready'));
cluster.worker.on('disconnect', this.emit.bind(this, 'disconnect'));
/**
* Extend {Worker} properties with passed by {Master}.
* @param {ClusterProcess} proc
* @param {*} props
*/
applyForeignProperties(proc, props) {
for (const propName of Object.keys(props)) {
Object.defineProperty(this, propName, {
value: props[propName],
enumerable: true
});
}
this.emit('foreign properties received');
}
this._ready = false;
whenForeignPropertiesReceived() {
return this._foreignPropertiesReceivedPromise;
}
this.registerRemoteCommand(RPC.fns.worker.applyForeignProperties, this.applyForeignProperties.bind(this));
this.registerRemoteCommand(RPC.fns.worker.broadcastMasterEvent, this.broadcastMasterEvent.bind(this));
});
/**
* @override
* @see ClusterProcess
* @private
*/
_setupIPCMessagesHandler() {
process.on('message', this._onMessage.bind(this, this));
}
/**
* Emit an event received from the master as 'master <event>'.
*/
Worker.prototype.broadcastMasterEvent = function(proc, emitArgs) {
var args = ['master ' + emitArgs[0]].concat(emitArgs.slice(1));
this.emit.apply(this, args);
};
/**
* Turns worker to `ready` state. Must be called by worker
* if option `control.triggerReadyStateManually` set `true`.
* @returns {Worker} self
* @public
*/
ready() {
if (this._ready) {
throw new LusterWorkerError(LusterWorkerError.CODES.ALREADY_READY);
}
/**
* Transmit worker event to master, which plays as relay,
* retransmitting it as 'worker <event>' to all master-side listeners.
* @param {String} event Event name
* @private
*/
Worker.prototype._broadcastEvent = function(event) { // eslint-disable-line no-unused-vars
var args = [RPC.fns.master.broadcastWorkerEvent],
i = 0,
len = arguments.length;
this._ready = true;
this.emit('ready');
for (; i < len; i++) {
args.push(arguments[i]);
return this;
}
this.remoteCall.apply(this, args);
};
/**
* Do a remote call to master, wait for master to handle it, then execute registered callback
* @method
* @param {String} opts.command
* @param {Function} opts.callback
* @param {Number} [opts.timeout] in milliseconds
* @param {*} [opts.data]
* @public
*/
remoteCallWithCallback(opts) {
const callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout);
/**
* Extend {Worker} properties with passed by {Master}.
* @param {ClusterProcess} proc
* @param {*} props
*/
Worker.prototype.applyForeignProperties = function(proc, props) {
Object.keys(props)
.forEach(function(propName) {
Object.defineProperty(this, propName, {
value: props[propName],
enumerable: true
});
}, this);
this.emit('foreign properties received');
};
this.remoteCall(opts.command, opts.data, callbackId);
}
/**
* @param {Function} fn
*/
Worker.whenForeignPropertiesReceived = function(fn) {
return /** @this {Worker} */function() {
if (this._foreignPropertiesReceived) {
setImmediate(fn.bind(this));
} else {
this.once('foreign properties received', fn.bind(this));
}
async _run() {
await this.whenInitialized();
await this.whenForeignPropertiesReceived();
return this;
};
};
const workerBase = this.config.resolve('app');
/**
* `Require` application main script.
* Execution will be delayed until Worker became configured
* (`configured` event fired).
* @returns {Worker} self
* @public
*/
Worker.prototype.run = Worker.whenInitialized(
Worker.whenForeignPropertiesReceived(function() {
var workerBase = this.config.resolve('app');
require(workerBase);
try {
require(workerBase);
} catch (e) {
console.error(`Worker failed on require ${workerBase}`);
console.error(e);
process.exit(1);
}
this.emit('loaded', workerBase);
if ( ! this.config.get('control.triggerReadyStateManually', false)) {
if (!this.config.get('control.triggerReadyStateManually', false)) {
setImmediate(this.ready.bind(this));
}
}
/**
* `Require` application main script.
* Execution will be delayed until Worker became configured
* (`configured` event fired).
* @returns {Worker} self
* @public
*/
run() {
this._run();
return this;
}));
}
}
/**
* @override
* @see ClusterProcess
* @private
*/
Worker.prototype._setupIPCMessagesHandler = function() {
process.on('message', this._onMessage.bind(this, this));
};
/**
* Call Master method via RPC

@@ -155,34 +183,2 @@ * @method

/**
* Turns worker to `ready` state. Must be called by worker
* if option `control.triggerReadyStateManually` set `true`.
* @returns {Worker} self
* @public
*/
Worker.prototype.ready = function() {
if (this._ready) {
throw new LusterWorkerError(LusterWorkerError.CODES.ALREADY_READY);
}
this._ready = true;
this.emit('ready');
return this;
};
/**
* Do a remote call to master, wait for master to handle it, then execute registered callback
* @method
* @param {String} opts.command
* @param {Function} opts.callback
* @param {Number} [opts.timeout] in milliseconds
* @param {*} [opts.data]
* @public
*/
Worker.prototype.remoteCallWithCallback = function(opts) {
var callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout);
this.remoteCall(opts.command, opts.data, callbackId);
};
module.exports = Worker;
{
"name": "luster",
"version": "2.0.1",
"version": "3.0.0-alpha.0",
"description": "Node.js cluster wrapper",

@@ -10,3 +10,3 @@ "main": "./lib/luster.js",

"scripts": {
"lint": "eslint ./lib ./test ./examples",
"lint": "eslint ./lib ./test ./examples ./bin",
"unit": "istanbul test _mocha -- test/unit/test",

@@ -40,4 +40,4 @@ "func": "mocha test/func/test $@",

"dependencies": {
"extend": "^3.0.0",
"objex": "^0.4.1",
"delay": "^3.0.0",
"p-event": "^2.1.0",
"terror": "^1.0.0"

@@ -47,3 +47,4 @@ },

"chai": "^3.5.0",
"eslint": "^3.15.0",
"chai-as-promised": "^7.1.1",
"eslint": "^4.19.1",
"eslint-config-nodules": "^0.4.0",

@@ -56,4 +57,4 @@ "istanbul": "^0.4.1",

"engines": {
"node": ">=4"
"node": ">=8"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc