New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

think-cluster

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

think-cluster - npm Package Compare versions

Comparing version 1.0.5 to 1.0.6

test/mock-test/delegate.js

74

lib/agent_client.js

@@ -15,8 +15,8 @@ const util = require('./util.js');

*/
constructor(){
constructor() {
this.client = null;
this.tasks = {};
//get agent server address
// get agent server address
process.on('message', message => {
if(message && message.act === util.THINK_AGENT_OPTIONS){
if (message && message.act === util.THINK_AGENT_OPTIONS) {
debug(`receive agent worker address: ${JSON.stringify(message.address)}, pid:${process.pid}`);

@@ -30,3 +30,3 @@ this.createConnection(message.address);

*/
get status(){
get status() {
return this[STATUS] || 'waiting';

@@ -37,14 +37,14 @@ }

*/
set status(status){
if(this.status === status) return;
set status(status) {
if (this.status === status) return;
this[STATUS] = status;
if(status === 'connected'){
if (status === 'connected') {
this.captureData();
for(let taskId in this.tasks){
let data = this.tasks[taskId].data;
for (const taskId in this.tasks) {
const data = this.tasks[taskId].data;
this.sendData(data);
}
}else if(status === 'closed'){
//process.emit(util.THINK_AGENT_CLOSED);
//this.doLeaveTask();
} else if (status === 'closed') {
// process.emit(util.THINK_AGENT_CLOSED);
// this.doLeaveTask();
}

@@ -55,7 +55,7 @@ }

*/
doLeaveTask(){
for(let taskId in this.tasks){
let item = this.tasks[taskId];
let options = item.options;
let args = item.data.mArgs;
doLeaveTask() {
for (const taskId in this.tasks) {
const item = this.tasks[taskId];
const options = item.options;
const args = item.data.mArgs;
options.method.apply(options.ctx, args).then(data => {

@@ -72,4 +72,4 @@ item.resolve(data);

*/
captureData(){
//let pinTimes = 0;
captureData() {
// let pinTimes = 0;
this.client.on('data', data => {

@@ -95,13 +95,13 @@ // if(data === util.PIN){

*/
handleData(data){
try{
handleData(data) {
try {
data = JSON.parse(data);
}catch(err){
} catch (err) {
return;
}
let deferred = this.tasks[data.taskId];
if(!deferred) return;
if(data.err){
const deferred = this.tasks[data.taskId];
if (!deferred) return;
if (data.err) {
deferred.reject(new Error(data.err));
}else{
} else {
deferred.resolve(data.data);

@@ -114,3 +114,3 @@ }

*/
get isConnected(){
get isConnected() {
return this.status === 'connected';

@@ -121,3 +121,3 @@ }

*/
get isClosed(){
get isClosed() {
return this.status === 'closed';

@@ -128,3 +128,3 @@ }

*/
createConnection(options){
createConnection(options) {
const client = net.connect(options, () => {

@@ -151,3 +151,3 @@ debug(`connect agent server success, pid: ${process.pid}`);

*/
sendData(data){
sendData(data) {
this.client.write(JSON.stringify(data));

@@ -159,10 +159,10 @@ }

*/
send(data, options){
send(data, options) {
const taskId = helper.uuid().slice(0, 8);
data.taskId = taskId;
let deferred = helper.defer();
const deferred = helper.defer();
deferred.options = options;
if(this.isConnected){
if (this.isConnected) {
this.sendData(data);
}else{
} else {
deferred.data = data;

@@ -176,4 +176,4 @@ }

*/
static getInstance(){
if(this[INSTANCE]) return this[INSTANCE];
static getInstance() {
if (this[INSTANCE]) return this[INSTANCE];
this[INSTANCE] = new AgentClient();

@@ -184,2 +184,2 @@ return this[AgentClient];

module.exports = AgentClient;
module.exports = AgentClient;

@@ -9,3 +9,3 @@ const assert = require('assert');

*/
let delegateClass = {};
const delegateClass = {};

@@ -20,3 +20,3 @@ /**

*/
constructor(options){
constructor(options) {
this.options = util.parseOptions(options);

@@ -28,13 +28,13 @@ }

*/
handleTask(data){
let cls = delegateClass[data.classId];
assert(cls, 'can not find class, classId: ${data.classId}');
handleTask(data) {
const Cls = delegateClass[data.classId];
assert(Cls, `can not find class, classId: ${data.classId}`);
assert(helper.isArray(data.cArgs), '.cArgs must be an array');
const instance = new cls(...data.cArgs);
assert(cls[data.method], `class method ${data.method} not exist`);
const instance = new Cls(...data.cArgs);
assert(Cls[data.method], `class method ${data.method} not exist`);
assert(data.mArgs, '.mArgs must be an array');
try{
let ret = instance[data.method](...data.mArgs);
try {
const ret = instance[data.method](...data.mArgs);
return Promise.resolve(ret);
}catch(err){
} catch (err) {
return Promise.reject(err);

@@ -47,12 +47,12 @@ }

*/
handleServerData(data, client){
if(data === util.PIN){
handleServerData(data, client) {
if (data === util.PIN) {
return client.write(util.PIN);
}
try{
try {
data = JSON.parse(data);
}catch(err){
} catch (err) {
return;
}
if(!data.taskId || !data.classId || !data.method) return;
if (!data.taskId || !data.classId || !data.method) return;
this.handleTask(data).then(result => {

@@ -68,3 +68,3 @@ client.write(JSON.stringify({

}));
})
});
}

@@ -74,3 +74,3 @@ /**

*/
createServer(){
createServer() {
const server = net.createServer(client => {

@@ -92,4 +92,4 @@ client.on('data', data => {

*/
static register(classId, cls){
if(delegateClass[classId]) return false;
static register(classId, cls) {
if (delegateClass[classId]) return false;
delegateClass[classId] = cls;

@@ -96,0 +96,0 @@ return true;

@@ -9,17 +9,17 @@ const helper = require('think-helper');

function delegate(cls, classId){
function delegate(cls, classId) {
assert(cls && helper.isFunction(cls), `delegate object required and must be a function`);
classId = classId || helper.md5(cls).slice(0, 8);
//in agent worker, not need delegate methods
if(util.isAgent()){
// in agent worker, not need delegate methods
if (util.isAgent()) {
Agent.register(classId, cls);
return cls;
}
//agent worker is not enabled
if(!util.enableAgent()) return cls;
// agent worker is not enabled
if (!util.enableAgent()) return cls;
let delegateCls = class delegateCls extends cls {};
const delegateCls = class delegateCls extends cls {};
let methods = delegateCls.delegateMethods;
if(helper.isFunction(methods)){
if (helper.isFunction(methods)) {
methods = methods();

@@ -30,12 +30,12 @@ }

let cArgs = null;
delegateCls.prototype.constructor = function(){
delegateCls.prototype.constructor = function() {
cArgs = arguments;
delegateCls.apply(this, arguments);
}
};
methods.forEach(method => {
assert(helper.isFunction(delegateCls.prototype[method]), `.${method} is not a function`);
let methodFn = delegateCls.prototype[method];
delegateCls.prototype[method] = function(){
//if agent client is closed, run method directly
if(agentClientInstance.isClosed){
const methodFn = delegateCls.prototype[method];
delegateCls.prototype[method] = function() {
// if agent client is closed, run method directly
if (agentClientInstance.isClosed) {
return methodFn.apply(this, arguments);

@@ -45,7 +45,7 @@ };

classId,
cArgs, //constructor arguments
cArgs, // constructor arguments
method,
mArgs: arguments //method arguments
mArgs: arguments // method arguments
}, {ctx: this, method: methodFn});
}
};
});

@@ -55,2 +55,2 @@ return delegateCls;

module.exports = delegate;
module.exports = delegate;
const cluster = require('cluster');
const util = require('./util.js');
//const helper = require('think-helper');
//const debug = require('debug')('think-cluster');
// const helper = require('think-helper');
// const debug = require('debug')('think-cluster');

@@ -16,3 +16,3 @@ let waitReloadWorkerTimes = 0;

*/
constructor(options){
constructor(options) {
this.options = util.parseOptions(options);

@@ -23,7 +23,8 @@ }

*/
captureReloadSignal(){
captureReloadSignal() {
const signal = this.options.reloadSignal;
process.on(signal, () => {
for(let id in cluster.workers){
let worker = cluster.workers[id];
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (!this.isAliveWorker(worker)) continue;
worker.send(util.THINK_RELOAD_SIGNAL);

@@ -33,9 +34,15 @@ }

}
isAliveWorker(worker) {
if (worker.state === 'disconnected' || worker.needKilled) {
return false;
}
return true;
}
/**
* get fork env
*/
getForkEnv(){
getForkEnv() {
return {
THINK_WORKERS: this.options.workers, //workers num
}
THINK_WORKERS: this.options.workers // workers num
};
}

@@ -45,3 +52,3 @@ /**

*/
forkAgentWorker(){
forkAgentWorker() {
return util.forkWorker({

@@ -54,11 +61,11 @@ THINK_AGENT_WORKER: 1

*/
forkWorkers(){
forkWorkers() {
const forkWorker = (env = {}, address) => {
let workers = this.options.workers;
const workers = this.options.workers;
let index = 0;
let promises = [];
while(index++ < workers){
const promises = [];
while (index++ < workers) {
env = Object.assign(env, this.getForkEnv());
let promise = util.forkWorker(env).then(data => {
if(address){
const promise = util.forkWorker(env).then(data => {
if (address) {
data.worker.send({act: util.THINK_AGENT_OPTIONS, address});

@@ -70,7 +77,7 @@ }

return Promise.all(promises);
}
if(this.options.reloadSignal){
};
if (this.options.reloadSignal) {
this.captureReloadSignal();
}
if(this.options.enableAgent){
if (this.options.enableAgent) {
return this.forkAgentWorker().then(data => {

@@ -85,16 +92,16 @@ return forkWorker({THINK_ENABLE_AGENT: 1}, data.address);

*/
killWorker(worker, reload){
if(reload) worker.hasGracefulReload = true;
worker.kill('SIGINT'); //windows don't support SIGQUIT
killWorker(worker, reload) {
if (reload) worker.hasGracefulReload = true;
worker.kill('SIGINT'); // windows don't support SIGQUIT
worker.needKilled = true;
setTimeout(function () {
if(!worker.isConnected()) return;
setTimeout(function() {
if (!worker.isConnected()) return;
worker.process.kill('SIGINT');
}, 100);
}
/**
/**
* force reload all workers, in development env
*/
forceReloadWorkers(){
if(waitReloadWorkerTimes){
forceReloadWorkers() {
if (waitReloadWorkerTimes) {
waitReloadWorkerTimes++;

@@ -105,12 +112,10 @@ return;

let aliveWorkers = [];
for(let id in cluster.workers){
let worker = cluster.workers[id];
if(worker.state === 'disconnected' || worker.needKilled){
continue;
}
const aliveWorkers = [];
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (!this.isAliveWorker(worker)) continue;
aliveWorkers.push(worker);
}
if(!aliveWorkers.length) return;
if(aliveWorkers.length > this.options.workers){
if (!aliveWorkers.length) return;
if (aliveWorkers.length > this.options.workers) {
console.error(`workers fork has leak, alive workers: ${aliveWorkers.length}, need workers: ${this.options.workers}`);

@@ -120,3 +125,3 @@ }

const promise = util.forkWorker(this.getForkEnv()).then(() => {
//http://man7.org/linux/man-pages/man7/signal.7.html
// http://man7.org/linux/man-pages/man7/signal.7.html
this.killWorker(firstWorker, true);

@@ -129,3 +134,3 @@ return aliveWorkers.map(worker => {

return promise.then(() => {
if(waitReloadWorkerTimes > 1){
if (waitReloadWorkerTimes > 1) {
waitReloadWorkerTimes = 0;

@@ -140,2 +145,2 @@ this.forceReloadWorkers();

module.exports = Master;
module.exports = Master;

@@ -14,3 +14,3 @@ const cluster = require('cluster');

class Messenger extends events {
constructor(){
constructor() {
super();

@@ -24,7 +24,7 @@ this.bindEvent();

*/
getWorkers(type = 'all', cWorker){
let workers = [];
for(let id in cluster.workers){
let worker = cluster.workers[id];
switch(type){
getWorkers(type = 'all', cWorker) {
const workers = [];
for (const id in cluster.workers) {
const worker = cluster.workers[id];
switch (type) {
case 'all':

@@ -34,13 +34,13 @@ workers.push(worker);

case 'app':
if(!worker.isAgent) workers.push(worker);
if (!worker.isAgent) workers.push(worker);
break;
case 'agent':
if(worker.isAgent) workers.push(worker);
if (worker.isAgent) workers.push(worker);
break;
case 'one':
if(!workers.length) workers.push(worker);
if (!workers.length) workers.push(worker);
break;
}
}
if(type === 'one' && workers[0] !== cWorker) return [];
if (type === 'one' && workers[0] !== cWorker) return [];
return workers;

@@ -52,16 +52,16 @@ }

*/
bindEvent(){
if(process[MessengerInit]) return;
bindEvent() {
if (process[MessengerInit]) return;
process[MessengerInit] = true;
if(cluster.isMaster){
if (cluster.isMaster) {
cluster.on('message', (worker, message) => {
if(message && message.act === MESSENGER){
let workers = this.getWorkers(message.target, worker);
if (message && message.act === MESSENGER) {
const workers = this.getWorkers(message.target, worker);
workers.forEach(worker => worker.send(message));
}
})
}else{
});
} else {
process.on('message', message => {
if(message && message.act === MESSENGER){
if (message && message.act === MESSENGER) {
this.emit(message.action, message.data);

@@ -76,4 +76,4 @@ }

*/
setTimeout(actionName, timeout = 3000){
setTimeout(() => process.emit(actionName, new Error('timeout')), timeout);
setTimeout(actionName, timeout = 3000) {
setTimeout(() => this.emit(actionName, new Error('timeout')), timeout);
}

@@ -86,7 +86,7 @@ /**

*/
broadcast(action, data){
broadcast(action, data) {
process.send({
act: MESSENGER,
action,
data,
act: MESSENGER,
action,
data,
target: 'all'

@@ -100,7 +100,7 @@ });

*/
runInOne(callback){
let id = taskId++;
let actionName = `think-messenger-${id}`;
runInOne(callback) {
const id = taskId++;
const actionName = `think-messenger-${id}`;
process.send({
act: MESSENGER,
act: MESSENGER,
action: actionName,

@@ -110,3 +110,3 @@ target: 'one'

this.once(actionName, data => {
if(!helper.isError(data) && callback){
if (!helper.isError(data) && callback) {
callback();

@@ -119,2 +119,2 @@ }

module.exports = Messenger;
module.exports = Messenger;

@@ -21,5 +21,5 @@ const cluster = require('cluster');

*/
exports.isFirstWorker = function(){
exports.isFirstWorker = function() {
return +process.env.THINK_PROCESS_ID === 1;
}
};

@@ -29,21 +29,21 @@ /**

*/
exports.isAgent = function(){
exports.isAgent = function() {
return !!process.env.THINK_AGENT_WORKER;
}
};
/**
* enable agent
*/
exports.enableAgent = function(){
return !! process.env.THINK_ENABLE_AGENT;
}
exports.enableAgent = function() {
return !!process.env.THINK_ENABLE_AGENT;
};
/**
* parse options
*/
exports.parseOptions = function(options = {}){
exports.parseOptions = function(options = {}) {
options.workers = options.workers || cpus;
if(options.workers < 2){
if (options.workers < 2) {
options.enableAgent = false;
}
return options;
}
};

@@ -53,29 +53,31 @@ /**

*/
exports.forkWorker = function(env = {}){
let deferred = helper.defer();
exports.forkWorker = function(env = {}) {
const deferred = helper.defer();
env.THINK_PROCESS_ID = env.THINK_AGENT_WORKER ? 0 : thinkProcessId++;
const worker = cluster.fork(env);
if(env.THINK_AGENT_WORKER){
if (env.THINK_AGENT_WORKER) {
worker.isAgent = true;
}
worker.on('message', message => {
if(worker.hasGracefulReload) return;
if(message === exports.THINK_GRACEFUL_DISCONNECT){
debug(`refork worker, receive message 'think-graceful-disconnect', pid: ${process.pid}`);
if (worker.hasGracefulReload) return;
if (message === exports.THINK_GRACEFUL_DISCONNECT) {
debug(`refork worker, receive message 'think-graceful-disconnect'`);
worker.hasGracefulReload = true;
exports.forkWorker(env);
exports.forkWorker(env).then(() => {
worker.send(util.THINK_GRACEFUL_FORK);
});
}
});
worker.once('exit', (code, signal) => {
if(worker.hasGracefulReload) return;
debug(`worker exit, code:${code}, signal:${signal}, pid: ${process.pid}`);
if (worker.hasGracefulReload) return;
debug(`worker exit, code:${code}, signal:${signal}`);
exports.forkWorker(env);
});
worker.once('listening', address => {
if(worker.isAgent){
if (worker.isAgent) {
debug(`agent worker is listening, address:${JSON.stringify(address)}`);
//send agent server address to workers
for(let id in cluster.workers){
let item = cluster.workers[id];
if(item.isAgent) continue;
// send agent server address to workers
for (const id in cluster.workers) {
const item = cluster.workers[id];
if (item.isAgent) continue;
item.send({act: util.THINK_AGENT_OPTIONS, address});

@@ -87,2 +89,2 @@ }

return deferred.promise;
}
};

@@ -14,6 +14,6 @@ const util = require('./util.js');

logger: console.error.bind(console),
disableKeepAlive: false, //disabled connect keep alive
onUncaughtException: () => {},
onUnhandledRejection: () => {},
processKillTimeout: 10 * 1000 //10s
disableKeepAlive: false, // disabled connect keep alive
onUncaughtException: () => {}, // onUncaughtException event handle
onUnhandledRejection: () => {}, // onUnhandledRejection event handle
processKillTimeout: 10 * 1000 // 10s
};

@@ -28,3 +28,3 @@ /**

*/
constructor(options){
constructor(options) {
options = util.parseOptions(options);

@@ -36,4 +36,4 @@ this.options = Object.assign({}, defaultOptions, options);

*/
disableKeepAlive(){
if(this[KEEP_ALIVE]) return;
disableKeepAlive() {
if (this[KEEP_ALIVE]) return;
this[KEEP_ALIVE] = true;

@@ -52,3 +52,3 @@ const server = this.options.server;

*/
closeServer(){
closeServer() {
this.disableKeepAlive();

@@ -58,3 +58,3 @@ const logger = this.options.logger;

const killTimeout = this.options.processKillTimeout;
if(killTimeout){
if (killTimeout) {
const timer = setTimeout(() => {

@@ -67,7 +67,2 @@ logger(`process exit by killed(timeout: ${killTimeout}ms), pid: ${process.pid}`);

const worker = cluster.worker;
worker.on('disconnect', () => {
logger(`process exit by disconnect event, pid: ${process.pid}`);
process.exit(0);
});
const server = this.options.server;

@@ -77,3 +72,7 @@ logger(`start close server, pid: ${process.pid}, connections: ${server._connections}`);

logger(`server closed, pid: ${process.pid}`);
worker.disconnect();
try {
worker.disconnect();
} catch (e) {
logger(`already disconnect, pid:${process.pid}`);
}
});

@@ -85,12 +84,12 @@ }

*/
disconnectWorker(sendSignal){
disconnectWorker(sendSignal) {
const worker = cluster.worker;
if(sendSignal){
if (sendSignal) {
worker.send(util.THINK_GRACEFUL_DISCONNECT);
worker.once('message', message => {
if(message === util.THINK_GRACEFUL_FORK){
if (message === util.THINK_GRACEFUL_FORK) {
this.closeServer();
}
});
}else{
} else {
this.closeServer();

@@ -102,5 +101,5 @@ }

*/
captureReloadSignal(){
captureReloadSignal() {
process.once('message', message => {
if(message === util.THINK_RELOAD_SIGNAL){
if (message === util.THINK_RELOAD_SIGNAL) {
this.disconnectWorker(true);

@@ -113,3 +112,3 @@ }

*/
uncaughtException(){
uncaughtException() {
let errTimes = 0;

@@ -121,3 +120,3 @@ process.on('uncaughtException', err => {

this.options.logger(err.stack);
if(errTimes === 1 && !this.options.debug){
if (errTimes === 1 && !this.options.debug) {
this.disconnectWorker(true);

@@ -130,3 +129,3 @@ }

*/
unhandledRejection(){
unhandledRejection() {
let rejectTimes = 0;

@@ -142,7 +141,7 @@ process.on('unhandledRejection', err => {

*/
captureEvents(){
captureEvents() {
assert(this.options.server, 'options.server required');
this.uncaughtException();
this.unhandledRejection();
if(this.options.disableKeepAlive){
if (this.options.disableKeepAlive) {
this.disableKeepAlive();

@@ -156,3 +155,3 @@ }

*/
getWorkers(){
getWorkers() {
return process.env.THINK_WORKERS;

@@ -162,2 +161,2 @@ }

module.exports = Worker;
module.exports = Worker;
{
"name": "think-cluster",
"description": "Cluster manage for ThinkJS",
"version": "1.0.5",
"version": "1.0.6",
"author": {

@@ -11,2 +11,4 @@ "name": "welefen",

"test": "eslint lib/ && nyc ava test/mock-test/",
"lint": "eslint lib/",
"lint-fix": "eslint --fix lib/",
"coverage": "nyc report --reporter=html"

@@ -26,6 +28,5 @@ },

"ava": "^0.18.0",
"babel-core": "^6.22.1",
"babel-eslint": "^7.1.1",
"eslint": "^4.2.0",
"eslint-config-eslint": "^4.0.0",
"mock-require": "^2.0.1",
"eslint": "2.8.0",
"nyc": "^7.0.0"

@@ -32,0 +33,0 @@ },

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc