Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

think-cluster

Package Overview
Dependencies
Maintainers
2
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

think-cluster - npm Package Compare versions

Comparing version 1.0.8 to 1.0.9

6

index.js
const util = require('./lib/util.js');
const messenger = require('./lib/messenger.js');
const Messenger = require('./lib/messenger.js');

@@ -9,4 +9,4 @@ exports.Worker = require('./lib/worker.js');

exports.messenger = new messenger();
exports.messenger = new Messenger();
exports.isAgent = util.isAgent;
exports.isFirstWorker = util.isFirstWorker;
exports.isFirstWorker = util.isFirstWorker;
const cluster = require('cluster');
const util = require('./util.js');
const net = require('net');
const stringHash = require('string-hash');

@@ -7,2 +9,14 @@ let waitReloadWorkerTimes = 0;

/**
* default options
*/
const defaultOptions = {
port: 0, // server listen port
host: '', // server listen host
sticky: false, // sticky cluster
getRemoteAddress: socket => socket.remoteAddress,
workers: 0, // fork worker nums
reloadSignal: '' // reload workers signal
};
/**
* Master class

@@ -16,3 +30,4 @@ */

constructor(options) {
this.options = util.parseOptions(options);
options = util.parseOptions(options);
this.options = Object.assign({}, defaultOptions, options);
}

@@ -27,14 +42,12 @@ /**

const worker = cluster.workers[id];
if (!this.isAliveWorker(worker)) continue;
if (!this.isAliveWorker(worker) || util.isAgent(worker)) continue;
worker.send(util.THINK_RELOAD_SIGNAL);
}
};
if (signal) {
process.on(signal, reloadWorkers);
}
if (signal) process.on(signal, reloadWorkers);
// if receive message `think-cluster-reload-workers` from worker, restart all workers
cluster.on('message', (worker, message) => {
if (message === 'think-cluster-reload-workers') {
reloadWorkers();
}
if (message !== 'think-cluster-reload-workers') return;
reloadWorkers();
});

@@ -47,3 +60,4 @@ }

isAliveWorker(worker) {
if (worker.state === 'disconnected' || worker.needKilled) {
const state = worker.state;
if (state === 'disconnected' || state === 'dead' || worker.needKilled) {
return false;

@@ -109,3 +123,3 @@ }

/**
* force reload all workers, in development env
* force reload all workers when code is changed, in development env
*/

@@ -126,5 +140,10 @@ forceReloadWorkers() {

if (!aliveWorkers.length) return;
if (aliveWorkers.length > this.options.workers) {
// check alive workers has leak
let allowWorkers = this.options.workers;
if (this.options.enableAgent) allowWorkers++;
if (aliveWorkers.length > allowWorkers) {
console.error(`workers fork has leak, alive workers: ${aliveWorkers.length}, need workers: ${this.options.workers}`);
}
const firstWorker = aliveWorkers.shift();

@@ -148,4 +167,37 @@ const promise = util.forkWorker(this.getForkEnv()).then(() => {

}
/**
* create server with sticky
* https://github.com/uqee/sticky-cluster
*/
createServer() {
const deferred = think.defer();
const server = net.createServer({pauseOnConnect: true}, socket => {
const remoteAddress = this.options.getRemoteAddress(socket) || '';
const index = stringHash(remoteAddress) % this.options.workers;
let idx = -1;
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (!this.isAliveWorker(worker) || util.isAgent(worker)) continue;
if (index === ++idx) {
worker.send(util.THINK_STICKY_CLUSTER, socket);
break;
}
}
});
server.listen(this.options.port, this.options.host, () => {
this.forkWorkers().then(() => {
deferred.resolve();
});
});
return deferred.promise;
}
/**
* start server, support sticky
*/
startServer() {
if (!this.options.sticky) return this.forkWorkers();
return this.createServer();
}
}
module.exports = Master;

@@ -15,2 +15,3 @@ const cluster = require('cluster');

exports.THINK_AGENT_CLOSED = 'think-agent-closed';
exports.THINK_STICKY_CLUSTER = 'think-sticky-cluster';

@@ -17,0 +18,0 @@ exports.PIN = 'PIN';

const util = require('./util.js');
const cluster = require('cluster');
const assert = require('assert');
const helper = require('think-helper');
const AgentClient = require('./agent_client.js');

@@ -12,7 +12,9 @@

const defaultOptions = {
debug: false,
logger: console.error.bind(console),
disableKeepAlive: false, // disabled connect keep alive
onUncaughtException: () => {}, // onUncaughtException event handle
onUnhandledRejection: () => {}, // onUnhandledRejection event handle
port: 0,
host: '',
sticky: false,
createServer: () => {},
logger: () => {},
onUncaughtException: () => false, // onUncaughtException event handle
onUnhandledRejection: () => false, // onUnhandledRejection event handle
processKillTimeout: 10 * 1000 // 10s

@@ -108,6 +110,7 @@ };

errTimes++;
this.options.onUncaughtException(err);
this.options.logger(`uncaughtException, times: ${errTimes}, pid: ${process.pid}`);
this.options.logger(err.stack);
if (errTimes === 1 && !this.options.debug) {
this.options.logger(err);
const flag = this.options.onUncaughtException(err);
if (errTimes === 1 && flag) {
this.disconnectWorker(true);

@@ -124,16 +127,42 @@ }

rejectTimes++;
this.options.onUnhandledRejection(err);
this.options.logger(`unhandledRejection, times: ${rejectTimes}, pid: ${process.pid}`);
this.options.logger(err);
const flag = this.options.onUnhandledRejection(err);
if (rejectTimes === 1 && flag) {
this.disconnectWorker(true);
}
});
}
/**
* listen port
*/
listen() {
const deferred = helper.defer();
this.server = this.options.createServer();
if (!this.options.sticky) {
this.server.listen(this.options.port, this.options.host, () => {
deferred.resolve();
});
} else {
process.on('message', (message, socket) => {
if (message !== util.THINK_STICKY_CLUSTER) return;
// emulate a connection event on the server by emitting the
// event with the connection master sent to us
this.server.emit('connection', socket);
// resume as we already catched the conn
socket.resume();
});
// start on random port, accept conn from this host only
this.server.listen(0, '127.0.0.1', () => {
deferred.resolve();
});
}
return deferred.promise;
}
/**
* capture events
*/
captureEvents() {
assert(this.options.server, 'options.server required');
this.uncaughtException();
this.unhandledRejection();
if (this.options.disableKeepAlive) {
this.disableKeepAlive();
}
this.captureReloadSignal();

@@ -143,2 +172,9 @@ AgentClient.getInstance();

/**
* start server
*/
startServer() {
this.captureEvents();
return this.listen();
}
/**
* get workers

@@ -145,0 +181,0 @@ */

{
"name": "think-cluster",
"description": "Cluster manage for ThinkJS",
"version": "1.0.8",
"version": "1.0.9",
"author": {

@@ -23,2 +23,4 @@ "name": "welefen",

"dependencies": {
"debug": "^2.6.8",
"string-hash": "^1.1.3",
"think-helper": "^1.0.0"

@@ -29,3 +31,3 @@ },

"eslint": "^4.2.0",
"eslint-config-eslint": "^4.0.0",
"eslint-config-think": "^1.0.2",
"mock-require": "^2.0.1",

@@ -32,0 +34,0 @@ "nyc": "^7.0.0"

@@ -16,3 +16,5 @@ const test = require('ava');

test.serial('normal case', async t => {
process.env.THINK_ENABLE_AGENT = false;
const cls = delegate(MyClass);

@@ -28,3 +30,2 @@ let instance = new cls();

const cls = delegate(MyClass);
let obj = new cls();
t.is(cls.name,'MyClass')

@@ -36,6 +37,2 @@ });

//
// constructor(){
// console.log('myclass constructor')
// }
//
// static delegateMethods(){

@@ -45,9 +42,7 @@ // return ['get','set']

//
// get(name){
// console.log('get')
// }
// get(name){}
//
// set(name,value){}
// }
//
// function delegate(cls){

@@ -58,3 +53,2 @@ // let delegateCls = class delegateCls extends cls {};

// delegateCls.prototype.constructor = function(){
// console.log('constructor')
// cArgs = arguments;

@@ -74,21 +68,4 @@ // delegateCls.apply(this, arguments);

//
// let delegateCls = delegate(MyClass);
// let d = delegate(MyClass);
//
// const ins = new delegateCls();
//
// ins.get();
// class childClass extends MyClass{
// constructor(){
// console.log('child constructor')
// }
// }
//
// new childClass();
// console.log(new d())

@@ -62,2 +62,3 @@ const test = require('ava');

},
on: () => {},
trigger(evtName,args){

@@ -124,3 +125,3 @@ this.workers.forEach(worker => {

cluster.trigger('listening')
t.is(cluster.workers.length,require('os').cpus().length+1)
t.is(cluster.workers.length, require('os').cpus().length+1)
});

@@ -127,0 +128,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc