Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

luster

Package Overview
Dependencies
Maintainers
6
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

luster - npm Package Compare versions

Comparing version 3.0.0-alpha.2 to 3.0.0-alpha.3

lib/worker-pool.js

19

lib/cluster_process.js

@@ -10,2 +10,4 @@ const cluster = require('cluster'),

const pEvent = require('p-event');
/**

@@ -63,5 +65,3 @@ * @param {Object} context

* */
this._initPromise = new Promise(resolve => {
this.once('initialized', resolve);
});
this._initPromise = pEvent(this, 'initialized');

@@ -115,2 +115,11 @@ /**

configure(config, applyEnv, basedir) {
this._setConfig(config, applyEnv, basedir);
if (this.config) {
this.emit('configured');
}
return this;
}
_setConfig(config, applyEnv, basedir) {
if (typeof applyEnv === 'undefined' || applyEnv) {

@@ -134,7 +143,3 @@ Configuration.applyEnvironment(config);

this.setMaxListeners(this.config.get('maxEventListeners', 100));
this.emit('configured');
}
return this;
}

@@ -141,0 +146,0 @@

@@ -23,2 +23,6 @@ const path = require('path'),

get raw() {
return this._rawConfig;
}
/**

@@ -97,2 +101,12 @@ * @param {String} path

extend(config) {
return new Configuration(
{
...this.raw,
...config,
},
this._resolveBaseDir,
);
}
/**

@@ -99,0 +113,0 @@ * Override config properties using `LUSTER_CONF` environment variable.

@@ -97,2 +97,24 @@ const Terror = require('terror'),

/**
* @constructor
* @class LusterMasterError
* @augments LusterError
*/
errors.LusterMasterError = LusterError.create('LusterMasterError',
{
POOL_KEY_ALREADY_TAKEN:
'Pool key "%key%" is already taken'
});
/**
* @constructor
* @class LusterMasterError
* @augments LusterError
*/
errors.LusterMasterError = LusterError.create('LusterMasterError',
{
POOL_DOES_NOT_EXIST:
'Pool with key "%key%" does not exist'
});
module.exports = errors;

@@ -20,4 +20,6 @@ const util = require('util'),

console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', this.wid, inspectedArgs);
const key = this.eexKey;
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', key, inspectedArgs);
return EventEmitter.prototype.emit.apply(this, args);

@@ -24,0 +26,0 @@ };

@@ -1,9 +0,11 @@

const os = require('os'),
cluster = require('cluster'),
ClusterProcess = require('./cluster_process'),
WorkerWrapper = require('./worker_wrapper'),
Port = require('./port'),
RestartQueue = require('./restart_queue'),
RPC = require('./rpc');
const cluster = require('cluster');
const ClusterProcess = require('./cluster_process');
const LusterMasterError = require('./errors').LusterMasterError;
const RPC = require('./rpc');
const WorkerPool = require('./worker-pool');
const WorkerWrapper = require('./worker_wrapper');
const DEFAULT_POOL_KEY = '__default';
/**

@@ -19,17 +21,2 @@ * @constructor

/**
* @type {Object}
* @property {WorkerWrapper} *
* @public
* @todo make it private or public immutable
*/
this.workers = {};
/**
* Workers restart queue.
* @type {RestartQueue}
* @private
*/
this._restartQueue = new RestartQueue();
/**
* Configuration object to pass to cluster.setupMaster()

@@ -41,9 +28,9 @@ * @type {Object}

this.pools = new Map();
this.id = 0;
this.wid = 0;
this.eexKey = 0;
this.pid = process.pid;
this.on('worker state', this._cleanupUnixSockets.bind(this));
this.on('worker exit', this._checkWorkersAlive.bind(this));
// @todo make it optional?

@@ -55,2 +42,48 @@ process.on('SIGINT', this._onSignalQuit.bind(this));

/**
* @param {*} key
* @returns {WorkerPool}
*/
createPool(key) {
if (this.pools.has(key)) {
throw LusterMasterError.createError(
LusterMasterError.CODES.POOL_KEY_ALREADY_TAKEN,
{key}
);
}
this.emit('create pool', key);
const pool = new WorkerPool(key, this);
this._proxyWorkerEvents(pool);
pool.on('shutdown', this._checkPoolsAlive.bind(this));
this.pools.set(key, pool);
return pool;
}
/**
* @param {*} key
* @returns {WorkerPool}
*/
removePool(key) {
const pool = this.pools.get(key);
if (!pool) {
throw LusterMasterError.createError(
LusterMasterError.CODES.POOL_DOES_NOT_EXIST,
{key}
);
}
this.emit('remove pool', key);
this.pools.delete(key);
return pool;
}
getPool(key) {
return this.pools.get(key);
}
getDefaultPool() {
return this.getPool(DEFAULT_POOL_KEY);
}
/**
* Allows same object structure as cluster.setupMaster().

@@ -78,43 +111,10 @@ * This function must be used instead of cluster.setupMaster(),

/**
* Remove not used unix socket before worker will try to listen it.
* @param {WorkerWrapper} worker
* @param {WorkerWrapperState} state
* @private
*/
_cleanupUnixSockets(worker, state) {
const port = worker.options.port;
if (this._restartQueue.has(worker) ||
state !== WorkerWrapper.STATES.LAUNCHING ||
port.family !== Port.UNIX) {
return;
}
const inUse = this.getWorkersArray().some(w =>
worker.wid !== w.wid &&
w.isRunning() &&
port.isEqualTo(w.options.port)
);
if (!inUse) {
port.unlink(err => {
if (err) {
this.emit('error', err);
}
});
}
}
/**
* Check for alive workers, if no one here, then emit "shutdown".
* @private
*/
_checkWorkersAlive() {
const workers = this.getWorkersArray(),
alive = workers.reduce(
(count, w) => w.dead ? count - 1 : count,
workers.length
);
_checkPoolsAlive() {
let dead = true;
this.forEachPool(pool => dead = dead && pool.dead);
if (alive === 0) {
if (dead) {
this.emit('shutdown');

@@ -125,13 +125,12 @@ }

/**
* Repeat WorkerWrapper events on Master and add 'worker ' prefix to event names
* Repeat WorkerWrapper events from WorkerPool on Master
* so for example 'online' became 'worker online'
* @private
* @param {WorkerWrapper} worker
* @param {WorkerPool} pool
*/
_proxyWorkerEvents(worker) {
WorkerWrapper.EVENTS
.forEach(eventName => {
const proxyEventName = 'worker ' + eventName;
worker.on(eventName, this.emit.bind(this, proxyEventName, worker));
});
_proxyWorkerEvents(pool) {
for (const eventName of WorkerWrapper.EVENTS) {
const proxyEventName = 'worker ' + eventName;
pool.on(proxyEventName, this.emit.bind(this, proxyEventName));
}
}

@@ -143,7 +142,3 @@

getWorkersIds() {
if (!this._workersIdsCache) {
this._workersIdsCache = this.getWorkersArray().map(w => w.wid);
}
return this._workersIdsCache;
return this.getWorkersArray().map(w => w.wid);
}

@@ -155,24 +150,13 @@

getWorkersArray() {
if (!this._workersArrayCache) {
this._workersArrayCache = Object.values(this.workers);
}
return this._workersArrayCache;
let result = [];
this.forEachPool(
pool => result = result.concat(pool.getWorkersArray())
);
return result;
}
/**
* Add worker to the pool
* @param {WorkerWrapper} worker
* @returns {Master} self
* @public
*/
add(worker) {
// invalidate Master#getWorkersIds and Master#getWorkersArray cache
this._workersIdsCache = null;
this._workersArrayCache = null;
this.workers[worker.wid] = worker;
this._proxyWorkerEvents(worker);
return this;
forEachPool(fn) {
for (const pool of this.pools.values()) {
fn(pool);
}
}

@@ -190,4 +174,5 @@

forEach(fn) {
this.getWorkersArray()
.forEach(fn);
this.forEachPool(pool => {
pool.forEach(fn);
});

@@ -207,2 +192,16 @@ return this;

configure(...args) {
this._setConfig(...args);
if (this.config) {
if (this.config.get('useDefaultPool', true)) {
this.createPool(DEFAULT_POOL_KEY).configure(this.config);
}
this.emit('configured');
}
return this;
}
/**

@@ -223,44 +222,2 @@ * Configure cluster

}
const // WorkerWrapper options
forkTimeout = this.config.get('control.forkTimeout'),
stopTimeout = this.config.get('control.stopTimeout'),
exitThreshold = this.config.get('control.exitThreshold'),
allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'),
count = this.config.get('workers', os.cpus().length),
isServerPortSet = this.config.has('server.port'),
groups = this.config.get('server.groups', 1),
workersPerGroup = Math.floor(count / groups);
let port,
// workers and groups count
i = 0,
group = 0,
workersInGroup = 0;
if (isServerPortSet) {
port = new Port(this.config.get('server.port'));
}
// create pool of workers
while (count > i++) {
this.add(new WorkerWrapper(this, {
forkTimeout,
stopTimeout,
exitThreshold,
allowedSequentialDeaths,
port: isServerPortSet ? port.next(group) : 0,
maxListeners: this.getMaxListeners(),
}));
// groups > 1, current group is full and
// last workers can form at least more one group
if (groups > 1 &&
++workersInGroup >= workersPerGroup &&
count - (group + 1) * workersPerGroup >= workersPerGroup) {
workersInGroup = 0;
group++;
}
}
}

@@ -275,19 +232,8 @@

waitForWorkers(wids, event) {
const pendingWids = new Set(wids);
const promises = [];
this.forEachPool(
pool => promises.push(pool.waitForWorkers(wids, event))
);
return new Promise(resolve => {
if (pendingWids.size === 0) {
resolve();
}
const onWorkerState = worker => {
const wid = worker.wid;
pendingWids.delete(wid);
if (pendingWids.size === 0) {
this.removeListener(event, onWorkerState);
resolve();
}
};
this.on(event, onWorkerState);
});
return Promise.all(promises);
}

@@ -317,3 +263,3 @@

// TODO maybe run this after starting waitForAllWorkers
this.forEach(worker => worker.restart());
this.forEachPool(pool => pool.restart());

@@ -337,2 +283,12 @@ await this.waitForAllWorkers('worker ready');

async _softRestart() {
const promises = [];
this.forEachPool(
pool => promises.push(pool.softRestart())
);
await Promise.all(promises);
this.emit('restarted');
}
/**

@@ -347,4 +303,3 @@ * Workers will be restarted one by one using RestartQueue.

softRestart() {
this.forEach(worker => worker.softRestart());
this._restartQueue.once('drain', this.emit.bind(this, 'restarted'));
this._softRestart();
return this;

@@ -354,15 +309,2 @@ }

/**
* Schedules one worker restart using RestartQueue.
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed
* into the queue, it will emit 'error' on restart.
* @public
* @param {WorkerWrapper} worker
* @returns {Master} self
*/
scheduleWorkerRestart(worker) {
this._restartQueue.push(worker);
return this;
}
/**
* @override

@@ -384,11 +326,3 @@ * @see ClusterProcess

remoteCallToAll(name, ...args) {
this.forEach(worker => {
if (worker.ready) {
worker.remoteCall(name, ...args);
} else {
worker.on('ready', () => {
worker.remoteCall(name, ...args);
});
}
});
this.forEachPool(pool => pool.remoteCallToAll(name, ...args));
}

@@ -404,7 +338,3 @@

broadcastEventToAll(event, ...args) {
this.forEach(worker => {
if (worker.ready) {
worker.broadcastEvent(event, ...args);
}
});
this.forEachPool(pool => pool.broadcastEventToAll(event, ...args));
}

@@ -421,3 +351,3 @@

this.emit(event, ...args);
this.broadcastEventToAll(event, ...args);
this.forEachPool(pool => pool.emitToAll(event, ...args));
}

@@ -430,16 +360,9 @@

async _shutdown() {
const stoppedWorkers = [];
const promises = [];
this.forEachPool(
pool => promises.push(pool._shutdown())
);
this.forEach(worker => {
if (worker.isRunning()) {
worker.stop();
stoppedWorkers.push(worker.wid);
}
});
await Promise.all(promises);
await this.waitForWorkers(
stoppedWorkers,
'worker exit',
);
this.emit('shutdown');

@@ -468,7 +391,3 @@ }

remoteCallToAllWithCallback(opts) {
this.forEach(worker => {
if (worker.isRunning()) {
worker.remoteCallWithCallback(opts);
}
});
this.forEachPool(pool => pool.remoteCallToAllWithCallback(opts));
}

@@ -482,3 +401,3 @@

// TODO maybe run this after starting waitForAllWorkers
this.forEach(worker => worker.run());
this.forEachPool(pool => pool.run());

@@ -485,0 +404,0 @@ await this.waitForAllWorkers('worker ready');

@@ -18,3 +18,3 @@ const EventEmitterEx = require('./event_emitter_ex'),

class RestartQueue extends EventEmitterEx {
constructor() {
constructor(eexKey = undefined) {
super();

@@ -27,2 +27,4 @@

this._queue = [];
this.eexKey = eexKey;
}

@@ -29,0 +31,0 @@

@@ -38,2 +38,3 @@ const cluster = require('cluster'),

this.port = options.port;
this.workerEnv = options.workerEnv;
}

@@ -70,3 +71,3 @@

* @augments EventEmitterEx
* @param {Master} master
* @param {WorkerPool} pool
* @param {WorkerWrapperOptions} options

@@ -80,3 +81,3 @@ *

class WorkerWrapper extends EventEmitterEx {
constructor(master, options) {
constructor(pool, options) {
super();

@@ -106,2 +107,3 @@

this._wid = ++nextId;
this.eexKey = this._wid;

@@ -158,6 +160,6 @@ /**

/**
* @type {Master}
* @type {WorkerPool}
* @private
*/
this._master = master;
this._pool = pool;

@@ -174,3 +176,3 @@ /**

WorkerWrapper._RPC_EVENTS.forEach(event => {
master.on('received worker ' + event, WorkerWrapper.createEventTranslator(event).bind(this));
pool.master.on('received worker ' + event, this.eventTranslator.bind(this, event));
});

@@ -213,2 +215,12 @@ this.on('ready', this._onReady.bind(this));

/**
* Pool this WorkerWrapper belongs to
* @memberOf {WorkerWrapper}
* @public
* @readonly
*/
get pool() {
return this._pool;
}
/**
* @see WorkerWrapper.STATES for possible `state` argument values

@@ -227,8 +239,6 @@ * @param {WorkerWrapperState} state

static createEventTranslator(event) {
return /** @this {WorkerWrapper} */function(worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit(event);
}
};
eventTranslator(event, worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit(event);
}
}

@@ -392,6 +402,6 @@

/** @private */
this._worker = cluster.fork({
this._worker = cluster.fork(Object.assign({
port: this.options.port,
LUSTER_WID: this.wid,
});
}, this.options.workerEnv));

@@ -561,7 +571,7 @@ /** @private */

/**
* Adds this worker to master's restart queue
* Adds this worker to pool's restart queue
* @public
*/
softRestart() {
this._master.scheduleWorkerRestart(this);
this._pool.scheduleWorkerRestart(this);
}

@@ -568,0 +578,0 @@ }

@@ -7,2 +7,4 @@ const cluster = require('cluster'),

const pEvent = require('p-event');
const wid = parseInt(process.env.LUSTER_WID, 10);

@@ -19,9 +21,7 @@

this.eexKey = wid;
const broadcastEvent = this._broadcastEvent;
this._foreignPropertiesReceivedPromise = new Promise(resolve => {
this.once('foreign properties received', () => {
resolve();
});
});
this._foreignPropertiesReceivedPromise = pEvent(this, 'foreign properties received');

@@ -147,3 +147,9 @@ this.on('configured', broadcastEvent.bind(this, 'configured'));

require(workerBase);
try {
require(workerBase);
} catch (e) {
console.error(`Worker failed on require ${workerBase}`);
console.error(e);
process.exit(1);
}
this.emit('loaded', workerBase);

@@ -150,0 +156,0 @@

{
"name": "luster",
"version": "3.0.0-alpha.2",
"version": "3.0.0-alpha.3",
"description": "Node.js cluster wrapper",

@@ -39,2 +39,4 @@ "main": "./lib/luster.js",

"dependencies": {
"delay": "^3.0.0",
"p-event": "^2.1.0",
"terror": "^1.0.0"

@@ -44,2 +46,3 @@ },

"chai": "^3.5.0",
"chai-as-promised": "^7.1.1",
"eslint": "^4.19.1",

@@ -46,0 +49,0 @@ "eslint-config-nodules": "^0.4.0",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc