Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

luster

Package Overview
Dependencies
Maintainers
5
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

luster - npm Package Compare versions

Comparing version 3.0.0-alpha.1 to 3.0.0-alpha.2

7

bin/luster.js
#!/usr/bin/env node
const /** @type {ClusterProcess} */
luster = require('../lib/luster'),
path = require('path'),
configFilePath = path.resolve(process.cwd(), process.argv[2] || 'luster.conf');
path = require('path');
// config path is right after this script in process.argv
const scriptArgvIndex = process.argv.findIndex(arg => arg === __filename || path.resolve(arg) === __filename);
const configFilePath = path.resolve(process.cwd(), process.argv[scriptArgvIndex + 1] || 'luster.conf');
luster.configure(require(configFilePath), true, path.dirname(configFilePath)).run();

@@ -10,4 +10,2 @@ const cluster = require('cluster'),

const pEvent = require('p-event');
/**

@@ -65,3 +63,5 @@ * @param {Object} context

* */
this._initPromise = pEvent(this, 'initialized');
this._initPromise = new Promise(resolve => {
this.once('initialized', resolve);
});

@@ -68,0 +68,0 @@ /**

@@ -23,6 +23,2 @@ const path = require('path'),

get raw() {
return this._rawConfig;
}
/**

@@ -101,12 +97,2 @@ * @param {String} path

extend(config) {
return new Configuration(
{
...this.raw,
...config,
},
this._resolveBaseDir,
);
}
/**

@@ -113,0 +99,0 @@ * Override config properties using `LUSTER_CONF` environment variable.

@@ -97,13 +97,2 @@ const Terror = require('terror'),

/**
* @constructor
* @class LusterMasterError
* @augments LusterError
*/
errors.LusterMasterError = LusterError.create('LusterMasterError',
{
POOL_KEY_ALREADY_TAKEN:
'Pool key "%key%" is already taken'
});
module.exports = errors;

@@ -20,6 +20,4 @@ const util = require('util'),

const key = this.eexKey;
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', this.wid, inspectedArgs);
console.log('%s(%s).emit(%s)', this.constructor.name || 'EventEmitterEx', key, inspectedArgs);
return EventEmitter.prototype.emit.apply(this, args);

@@ -26,0 +24,0 @@ };

@@ -1,11 +0,9 @@

const cluster = require('cluster');
const os = require('os'),
cluster = require('cluster'),
ClusterProcess = require('./cluster_process'),
WorkerWrapper = require('./worker_wrapper'),
Port = require('./port'),
RestartQueue = require('./restart_queue'),
RPC = require('./rpc');
const ClusterProcess = require('./cluster_process');
const LusterMasterError = require('./errors').LusterMasterError;
const RPC = require('./rpc');
const WorkerPool = require('./worker-pool');
const WorkerWrapper = require('./worker_wrapper');
const DEFAULT_POOL_KEY = '__default';
/**

@@ -21,2 +19,17 @@ * @constructor

/**
* @type {Object}
* @property {WorkerWrapper} *
* @public
* @todo make it private or public immutable
*/
this.workers = {};
/**
* Workers restart queue.
* @type {RestartQueue}
* @private
*/
this._restartQueue = new RestartQueue();
/**
* Configuration object to pass to cluster.setupMaster()

@@ -28,10 +41,9 @@ * @type {Object}

this.pools = new Map();
this.createPool(DEFAULT_POOL_KEY);
this.id = 0;
this.wid = 0;
this.eexKey = 0;
this.pid = process.pid;
this.on('worker state', this._cleanupUnixSockets.bind(this));
this.on('worker exit', this._checkWorkersAlive.bind(this));
// @todo make it optional?

@@ -42,22 +54,2 @@ process.on('SIGINT', this._onSignalQuit.bind(this));

createPool(key) {
if (this.pools.has(key)) {
throw LusterMasterError.createError(
LusterMasterError.CODES.POOL_KEY_ALREADY_TAKEN,
{key}
);
}
this.emit('create pool', key);
const pool = new WorkerPool(key, this);
this._proxyWorkerEvents(pool);
pool.on('shutdown', this._checkPoolsAlive.bind(this));
this.pools.set(key, pool);
return pool;
}
getPool(key) {
return this.pools.get(key);
}
/**

@@ -86,10 +78,43 @@ * Allows same object structure as cluster.setupMaster().

/**
* Remove not used unix socket before worker will try to listen it.
* @param {WorkerWrapper} worker
* @param {WorkerWrapperState} state
* @private
*/
_cleanupUnixSockets(worker, state) {
const port = worker.options.port;
if (this._restartQueue.has(worker) ||
state !== WorkerWrapper.STATES.LAUNCHING ||
port.family !== Port.UNIX) {
return;
}
const inUse = this.getWorkersArray().some(w =>
worker.wid !== w.wid &&
w.isRunning() &&
port.isEqualTo(w.options.port)
);
if (!inUse) {
port.unlink(err => {
if (err) {
this.emit('error', err);
}
});
}
}
/**
* Check for alive workers, if no one here, then emit "shutdown".
* @private
*/
_checkPoolsAlive() {
let dead = true;
this.forEachPool(pool => dead = dead && pool.dead);
_checkWorkersAlive() {
const workers = this.getWorkersArray(),
alive = workers.reduce(
(count, w) => w.dead ? count - 1 : count,
workers.length
);
if (dead) {
if (alive === 0) {
this.emit('shutdown');

@@ -100,12 +125,13 @@ }

/**
* Repeat WorkerWrapper events from WorkerPool on Master
* Repeat WorkerWrapper events on Master and add 'worker ' prefix to event names
* so for example 'online' became 'worker online'
* @private
* @param {WorkerPool} pool
* @param {WorkerWrapper} worker
*/
_proxyWorkerEvents(pool) {
for (const eventName of WorkerWrapper.EVENTS) {
const proxyEventName = 'worker ' + eventName;
pool.on(proxyEventName, this.emit.bind(this, proxyEventName));
}
_proxyWorkerEvents(worker) {
WorkerWrapper.EVENTS
.forEach(eventName => {
const proxyEventName = 'worker ' + eventName;
worker.on(eventName, this.emit.bind(this, proxyEventName, worker));
});
}

@@ -117,3 +143,7 @@

getWorkersIds() {
return this.getWorkersArray().map(w => w.wid);
if (!this._workersIdsCache) {
this._workersIdsCache = this.getWorkersArray().map(w => w.wid);
}
return this._workersIdsCache;
}

@@ -125,13 +155,24 @@

getWorkersArray() {
let result = [];
this.forEachPool(
pool => result = result.concat(pool.getWorkersArray())
);
return result;
if (!this._workersArrayCache) {
this._workersArrayCache = Object.values(this.workers);
}
return this._workersArrayCache;
}
forEachPool(fn) {
for (const pool of this.pools.values()) {
fn(pool);
}
/**
* Add worker to the pool
* @param {WorkerWrapper} worker
* @returns {Master} self
* @public
*/
add(worker) {
// invalidate Master#getWorkersIds and Master#getWorkersArray cache
this._workersIdsCache = null;
this._workersArrayCache = null;
this.workers[worker.wid] = worker;
this._proxyWorkerEvents(worker);
return this;
}

@@ -149,5 +190,4 @@

forEach(fn) {
this.forEachPool(pool => {
pool.forEach(fn);
});
this.getWorkersArray()
.forEach(fn);

@@ -183,3 +223,43 @@ return this;

this.pools.get(DEFAULT_POOL_KEY).configure(this.config);
const // WorkerWrapper options
forkTimeout = this.config.get('control.forkTimeout'),
stopTimeout = this.config.get('control.stopTimeout'),
exitThreshold = this.config.get('control.exitThreshold'),
allowedSequentialDeaths = this.config.get('control.allowedSequentialDeaths'),
count = this.config.get('workers', os.cpus().length),
isServerPortSet = this.config.has('server.port'),
groups = this.config.get('server.groups', 1),
workersPerGroup = Math.floor(count / groups);
let port,
// workers and groups count
i = 0,
group = 0,
workersInGroup = 0;
if (isServerPortSet) {
port = new Port(this.config.get('server.port'));
}
// create pool of workers
while (count > i++) {
this.add(new WorkerWrapper(this, {
forkTimeout,
stopTimeout,
exitThreshold,
allowedSequentialDeaths,
port: isServerPortSet ? port.next(group) : 0,
maxListeners: this.getMaxListeners(),
}));
// groups > 1, current group is full and
// last workers can form at least more one group
if (groups > 1 &&
++workersInGroup >= workersPerGroup &&
count - (group + 1) * workersPerGroup >= workersPerGroup) {
workersInGroup = 0;
group++;
}
}
}

@@ -194,8 +274,19 @@

waitForWorkers(wids, event) {
const promises = [];
this.forEachPool(
pool => promises.push(pool.waitForWorkers(wids, event))
);
const pendingWids = new Set(wids);
return Promise.all(promises);
return new Promise(resolve => {
if (pendingWids.size === 0) {
resolve();
}
const onWorkerState = worker => {
const wid = worker.wid;
pendingWids.delete(wid);
if (pendingWids.size === 0) {
this.removeListener(event, onWorkerState);
resolve();
}
};
this.on(event, onWorkerState);
});
}

@@ -225,3 +316,3 @@

// TODO maybe run this after starting waitForAllWorkers
this.forEachPool(pool => pool.restart());
this.forEach(worker => worker.restart());

@@ -245,12 +336,2 @@ await this.waitForAllWorkers('worker ready');

async _softRestart() {
const promises = [];
this.forEachPool(
pool => promises.push(pool.softRestart())
);
await Promise.all(promises);
this.emit('restarted');
}
/**

@@ -265,3 +346,4 @@ * Workers will be restarted one by one using RestartQueue.

softRestart() {
this._softRestart();
this.forEach(worker => worker.softRestart());
this._restartQueue.once('drain', this.emit.bind(this, 'restarted'));
return this;

@@ -271,2 +353,15 @@ }

/**
* Schedules one worker restart using RestartQueue.
* If a worker becomes dead, it will be just removed from restart queue. However, if already dead worker is pushed
* into the queue, it will emit 'error' on restart.
* @public
* @param {WorkerWrapper} worker
* @returns {Master} self
*/
scheduleWorkerRestart(worker) {
this._restartQueue.push(worker);
return this;
}
/**
* @override

@@ -288,3 +383,11 @@ * @see ClusterProcess

remoteCallToAll(name, ...args) {
this.forEachPool(pool => pool.remoteCallToAll(name, ...args));
this.forEach(worker => {
if (worker.ready) {
worker.remoteCall(name, ...args);
} else {
worker.on('ready', () => {
worker.remoteCall(name, ...args);
});
}
});
}

@@ -300,3 +403,7 @@

broadcastEventToAll(event, ...args) {
this.forEachPool(pool => pool.broadcastEventToAll(event, ...args));
this.forEach(worker => {
if (worker.ready) {
worker.broadcastEvent(event, ...args);
}
});
}

@@ -313,3 +420,3 @@

this.emit(event, ...args);
this.forEachPool(pool => pool.emitToAll(event, ...args));
this.broadcastEventToAll(event, ...args);
}

@@ -322,9 +429,16 @@

async _shutdown() {
const promises = [];
this.forEachPool(
pool => promises.push(pool._shutdown())
const stoppedWorkers = [];
this.forEach(worker => {
if (worker.isRunning()) {
worker.stop();
stoppedWorkers.push(worker.wid);
}
});
await this.waitForWorkers(
stoppedWorkers,
'worker exit',
);
await Promise.all(promises);
this.emit('shutdown');

@@ -353,3 +467,7 @@ }

remoteCallToAllWithCallback(opts) {
this.forEachPool(pool => pool.remoteCallToAllWithCallback(opts));
this.forEach(worker => {
if (worker.isRunning()) {
worker.remoteCallWithCallback(opts);
}
});
}

@@ -363,3 +481,3 @@

// TODO maybe run this after starting waitForAllWorkers
this.forEachPool(pool => pool.run());
this.forEach(worker => worker.run());

@@ -366,0 +484,0 @@ await this.waitForAllWorkers('worker ready');

@@ -18,3 +18,3 @@ const EventEmitterEx = require('./event_emitter_ex'),

class RestartQueue extends EventEmitterEx {
constructor(eexKey = undefined) {
constructor() {
super();

@@ -27,4 +27,2 @@

this._queue = [];
this.eexKey = eexKey;
}

@@ -31,0 +29,0 @@

@@ -38,3 +38,2 @@ const cluster = require('cluster'),

this.port = options.port;
this.workerEnv = options.workerEnv;
}

@@ -71,3 +70,3 @@

* @augments EventEmitterEx
* @param {WorkerPool} pool
* @param {Master} master
* @param {WorkerWrapperOptions} options

@@ -81,3 +80,3 @@ *

class WorkerWrapper extends EventEmitterEx {
constructor(pool, options) {
constructor(master, options) {
super();

@@ -107,3 +106,2 @@

this._wid = ++nextId;
this.eexKey = this._wid;

@@ -160,6 +158,6 @@ /**

/**
* @type {WorkerPool}
* @type {Master}
* @private
*/
this._pool = pool;
this._master = master;

@@ -176,3 +174,3 @@ /**

WorkerWrapper._RPC_EVENTS.forEach(event => {
pool.master.on('received worker ' + event, this.eventTranslator.bind(this, event));
master.on('received worker ' + event, WorkerWrapper.createEventTranslator(event).bind(this));
});

@@ -215,12 +213,2 @@ this.on('ready', this._onReady.bind(this));

/**
* Pool this WorkerWrapper belongs to
* @memberOf {WorkerWrapper}
* @public
* @readonly
*/
get pool() {
return this._pool;
}
/**
* @see WorkerWrapper.STATES for possible `state` argument values

@@ -239,6 +227,8 @@ * @param {WorkerWrapperState} state

eventTranslator(event, worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit(event);
}
static createEventTranslator(event) {
return /** @this {WorkerWrapper} */function(worker) {
if (this._worker && worker.id === this._worker.id) {
this.emit(event);
}
};
}

@@ -402,6 +392,6 @@

/** @private */
this._worker = cluster.fork(Object.assign({
this._worker = cluster.fork({
port: this.options.port,
LUSTER_WID: this.wid,
}, this.options.workerEnv));
});

@@ -571,7 +561,7 @@ /** @private */

/**
* Adds this worker to pool's restart queue
* Adds this worker to master's restart queue
* @public
*/
softRestart() {
this._pool.scheduleWorkerRestart(this);
this._master.scheduleWorkerRestart(this);
}

@@ -578,0 +568,0 @@ }

@@ -7,4 +7,2 @@ const cluster = require('cluster'),

const pEvent = require('p-event');
const wid = parseInt(process.env.LUSTER_WID, 10);

@@ -21,7 +19,9 @@

this.eexKey = wid;
const broadcastEvent = this._broadcastEvent;
this._foreignPropertiesReceivedPromise = pEvent(this, 'foreign properties received');
this._foreignPropertiesReceivedPromise = new Promise(resolve => {
this.once('foreign properties received', () => {
resolve();
});
});

@@ -147,9 +147,3 @@ this.on('configured', broadcastEvent.bind(this, 'configured'));

try {
require(workerBase);
} catch (e) {
console.error(`Worker failed on require ${workerBase}`);
console.error(e);
process.exit(1);
}
require(workerBase);
this.emit('loaded', workerBase);

@@ -156,0 +150,0 @@

{
"name": "luster",
"version": "3.0.0-alpha.1",
"version": "3.0.0-alpha.2",
"description": "Node.js cluster wrapper",

@@ -39,4 +39,2 @@ "main": "./lib/luster.js",

"dependencies": {
"delay": "^3.0.0",
"p-event": "^2.1.0",
"terror": "^1.0.0"

@@ -46,3 +44,2 @@ },

"chai": "^3.5.0",
"chai-as-promised": "^7.1.1",
"eslint": "^4.19.1",

@@ -49,0 +46,0 @@ "eslint-config-nodules": "^0.4.0",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc