Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

think-cluster

Package Overview
Dependencies
Maintainers
5
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

think-cluster - npm Package Compare versions

Comparing version 1.0.11 to 1.1.0

3

index.js

@@ -5,8 +5,5 @@ const util = require('./lib/util.js');

exports.Worker = require('./lib/worker.js');
exports.Agent = require('./lib/agent.js');
exports.Master = require('./lib/master.js');
exports.delegate = require('./lib/delegate.js');
exports.messenger = new Messenger();
exports.isAgent = util.isAgent;
exports.isFirstWorker = util.isFirstWorker;

75

lib/master.js

@@ -38,7 +38,3 @@ const cluster = require('cluster');

const reloadWorkers = () => {
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (!this.isAliveWorker(worker) || util.isAgent(worker)) continue;
worker.send(util.THINK_RELOAD_SIGNAL);
}
util.getAliveWorkers().forEach(worker => worker.send(util.THINK_RELOAD_SIGNAL));
};

@@ -54,13 +50,2 @@ if (signal) process.on(signal, reloadWorkers);

/**
* check worker is alive
* @param {Object} worker
*/
isAliveWorker(worker) {
const state = worker.state;
if (state === 'disconnected' || state === 'dead' || worker.needKilled) {
return false;
}
return true;
}
/**
* get fork env

@@ -74,35 +59,15 @@ */

/**
* fork agent worker
*/
forkAgentWorker() {
return util.forkWorker({
THINK_AGENT_WORKER: 1
});
}
/**
* fork workers
*/
forkWorkers() {
const forkWorker = (env = {}, address) => {
const workers = this.options.workers;
let index = 0;
const promises = [];
while (index++ < workers) {
env = Object.assign(env, this.getForkEnv());
const promise = util.forkWorker(env).then(data => {
if (address) {
data.worker.send({act: util.THINK_AGENT_OPTIONS, address});
}
});
promises.push(promise);
}
return Promise.all(promises);
};
this.captureReloadSignal();
if (this.options.enableAgent) {
return this.forkAgentWorker().then(data => {
return forkWorker({THINK_ENABLE_AGENT: 1}, data.address);
});
const workers = this.options.workers;
let index = 0;
const promises = [];
while (index++ < workers) {
const env = Object.assign({}, this.getForkEnv());
const promise = util.forkWorker(env);
promises.push(promise);
}
return forkWorker();
return Promise.all(promises);
}

@@ -113,5 +78,5 @@ /**

killWorker(worker, reload) {
if (reload) worker.hasGracefulReload = true;
if (reload) worker[util.WORKER_REALOD] = true;
worker.kill('SIGINT'); // windows don't support SIGQUIT
worker.needKilled = true;
worker[util.NEED_KILLED] = true;
setTimeout(function() {

@@ -132,13 +97,7 @@ if (!worker.isConnected()) return;

const aliveWorkers = [];
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (!this.isAliveWorker(worker)) continue;
aliveWorkers.push(worker);
}
const aliveWorkers = util.getAliveWorkers();
if (!aliveWorkers.length) return;
// check alive workers has leak
let allowWorkers = this.options.workers;
if (this.options.enableAgent) allowWorkers++;
const allowWorkers = this.options.workers;
if (aliveWorkers.length > allowWorkers) {

@@ -176,10 +135,8 @@ console.error(`workers fork has leak, alive workers: ${aliveWorkers.length}, need workers: ${this.options.workers}`);

let idx = -1;
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (!this.isAliveWorker(worker) || util.isAgent(worker)) continue;
util.getAliveWorkers().some(worker => {
if (index === ++idx) {
worker.send(util.THINK_STICKY_CLUSTER, socket);
break;
return true;
}
}
});
});

@@ -186,0 +143,0 @@ server.listen(this.options.port, this.options.host, () => {

const cluster = require('cluster');
const helper = require('think-helper');
const events = require('events');
const util = require('./util.js');
const assert = require('assert');
const MessengerInit = Symbol('think-messenger-init');
const MESSENGER = 'think-messenger';

@@ -24,22 +25,8 @@

getWorkers(type = 'all', cWorker) {
const workers = [];
for (const id in cluster.workers) {
const worker = cluster.workers[id];
switch (type) {
case 'all':
workers.push(worker);
break;
case 'app':
if (!worker.isAgent) workers.push(worker);
break;
case 'agent':
if (worker.isAgent) workers.push(worker);
break;
case 'one':
if (!workers.length) workers.push(worker);
break;
}
const aliveWorkers = util.getAliveWorkers();
if (type === 'all') return aliveWorkers;
if (type === 'one') {
if (!aliveWorkers.length || aliveWorkers[0] !== cWorker) return [];
return [aliveWorkers[0]];
}
if (type === 'one' && workers[0] !== cWorker) return [];
return workers;
}

@@ -51,5 +38,2 @@ /**

bindEvent() {
if (process[MessengerInit]) return;
process[MessengerInit] = true;
if (cluster.isMaster) {

@@ -71,9 +55,2 @@ cluster.on('message', (worker, message) => {

/**
* setTimeout
* @param {Number} timeout []
*/
setTimeout(actionName, timeout = 3000) {
setTimeout(() => this.emit(actionName, new Error('timeout')), timeout);
}
/**
* broadcast

@@ -93,2 +70,9 @@ * @param {String} action []

/**
* this method will be deprecated
* @param {Function} callback
*/
runInOne(callback) {
return this.consume(callback);
}
/**
* run in one worker

@@ -98,16 +82,13 @@ * @param {Function} callback []

*/
runInOne(callback) {
const id = taskId++;
const actionName = `think-messenger-${id}`;
consume(callback) {
assert(helper.isFunction(callback), 'callback must be a function');
const action = `think-messenger-${taskId++}`;
process.send({
act: MESSENGER,
action: actionName,
action,
target: 'one'
});
this.once(actionName, data => {
if (!helper.isError(data) && callback) {
callback();
}
});
this.setTimeout(actionName);
this.once(action, callback);
// remove event callback after timeout, avoid memory leak
helper.timeout(10000).then(() => this.removeAllListeners(action));
}

@@ -114,0 +95,0 @@ }

@@ -7,2 +7,4 @@ const cluster = require('cluster');

const debug = require('debug')('think-cluster');
const WORKER_REALOD = Symbol('worker-reload');
const NEED_KILLED = Symbol('need-killed');

@@ -14,8 +16,6 @@ let thinkProcessId = 1;

exports.THINK_GRACEFUL_DISCONNECT = 'think-graceful-disconnect';
exports.THINK_AGENT_OPTIONS = 'think-agent-options';
exports.THINK_AGENT_CLOSED = 'think-agent-closed';
exports.THINK_STICKY_CLUSTER = 'think-sticky-cluster';
exports.WORKER_REALOD = WORKER_REALOD;
exports.NEED_KILLED = NEED_KILLED;
exports.PIN = 'PIN';
/**

@@ -29,22 +29,28 @@ * check worker is first

/**
* check is agent worker
* parse options
*/
exports.isAgent = function() {
return !!process.env.THINK_AGENT_WORKER;
exports.parseOptions = function(options = {}) {
options.workers = options.workers || cpus;
return options;
};
/**
* enable agent
* check worker is alive
*/
exports.enableAgent = function() {
return !!process.env.THINK_ENABLE_AGENT;
exports.isAliveWorker = worker => {
const state = worker.state;
if (state === 'disconnected' || state === 'dead') return false;
if (worker[NEED_KILLED] || worker[WORKER_REALOD]) return false;
return true;
};
/**
* parse options
* get alive workers
*/
exports.parseOptions = function(options = {}) {
options.workers = options.workers || cpus;
if (options.workers < 2) {
options.enableAgent = false;
exports.getAliveWorkers = () => {
const workers = [];
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (!exports.isAliveWorker(worker)) continue;
workers.push(worker);
}
return options;
return workers;
};

@@ -57,32 +63,25 @@

const deferred = helper.defer();
env.THINK_PROCESS_ID = env.THINK_AGENT_WORKER ? 0 : thinkProcessId++;
env.THINK_PROCESS_ID = thinkProcessId++;
const worker = cluster.fork(env);
if (env.THINK_AGENT_WORKER) {
worker.isAgent = true;
}
worker.on('message', message => {
if (worker.hasGracefulReload) return;
if (worker[WORKER_REALOD]) return;
if (message === exports.THINK_GRACEFUL_DISCONNECT) {
debug(`refork worker, receive message 'think-graceful-disconnect'`);
worker.hasGracefulReload = true;
exports.forkWorker(env).then(() => {
worker.send(util.THINK_GRACEFUL_FORK);
});
worker[WORKER_REALOD] = true;
exports.forkWorker(env).then(() => worker.send(util.THINK_GRACEFUL_FORK));
}
});
worker.once('disconnect', () => {
if (worker[WORKER_REALOD]) return;
debug(`worker disconnect`);
worker[WORKER_REALOD] = true;
exports.forkWorker(env);
});
worker.once('exit', (code, signal) => {
if (worker.hasGracefulReload) return;
if (worker[WORKER_REALOD]) return;
debug(`worker exit, code:${code}, signal:${signal}`);
worker[WORKER_REALOD] = true;
exports.forkWorker(env);
});
worker.once('listening', address => {
if (worker.isAgent) {
debug(`agent worker is listening, address:${JSON.stringify(address)}`);
// send agent server address to workers
for (const id in cluster.workers) {
const item = cluster.workers[id];
if (item.isAgent) continue;
item.send({act: util.THINK_AGENT_OPTIONS, address});
}
}
deferred.resolve({worker, address});

@@ -89,0 +88,0 @@ });

const util = require('./util.js');
const cluster = require('cluster');
const helper = require('think-helper');
const AgentClient = require('./agent_client.js');
const debug = require('debug')('think-cluster');
const KEEP_ALIVE = Symbol('think-graceful-keepalive');
const WORKER_RELOAD = Symbol('worker-reload');

@@ -77,2 +77,6 @@ /**

const worker = cluster.worker;
// if worker has diconnect, return directly
if (worker[WORKER_RELOAD]) return;
worker[WORKER_RELOAD] = true;
if (sendSignal) {

@@ -93,3 +97,3 @@ worker.send(util.THINK_GRACEFUL_DISCONNECT);

captureReloadSignal() {
process.once('message', message => {
process.on('message', message => {
if (message === util.THINK_RELOAD_SIGNAL) {

@@ -164,3 +168,2 @@ this.disconnectWorker(true);

this.captureReloadSignal();
AgentClient.getInstance();
}

@@ -167,0 +170,0 @@ /**

{
"name": "think-cluster",
"description": "Cluster manage for ThinkJS",
"version": "1.0.11",
"version": "1.1.0",
"author": {

@@ -6,0 +6,0 @@ "name": "welefen",

@@ -13,3 +13,2 @@ const test = require('ava');

let masterProcess = null;

@@ -23,11 +22,11 @@ test.afterEach.always(() => {

function executeProcess(fileName, options, funcName, callback) {
let scriptPath = path.join(__dirname, '../script', fileName);
masterProcess = spawn(`node`, [scriptPath, funcName , JSON.stringify(options)]);
const scriptPath = path.join(__dirname, '../script', fileName);
masterProcess = spawn(`node`, [scriptPath, funcName, JSON.stringify(options)]);
masterProcess.stdout.on('data', (buf) => {
try{
let json = JSON.parse(buf.toString('utf-8'));
try {
const json = JSON.parse(buf.toString('utf-8'));
callback(json);
}catch (e){
callback({message:buf.toString('utf-8')});
} catch (e) {
callback({message: buf.toString('utf-8')});
}

@@ -41,7 +40,7 @@ });

try {
let result = {};
let options = {
const result = {};
const options = {
workers: 1
};
executeProcess('master.js', options,'forkWorkers', (output) => {
executeProcess('master.js', options, 'forkWorkers', (output) => {
Object.assign(result, output);

@@ -58,4 +57,4 @@ });

try {
let result = {};
let options = {
const result = {};
const options = {
workers: 2,

@@ -65,3 +64,3 @@ reloadSignal: 'SIGUSR2',

};
executeProcess('master.js', options,'forkWorkers', (output) => {
executeProcess('master.js', options, 'forkWorkers', (output) => {
Object.assign(result, output);

@@ -78,4 +77,4 @@ });

try {
let result = {};
let options = {
const result = {};
const options = {
workers: 1,

@@ -85,3 +84,3 @@ reloadSignal: 'SIGUSR2',

};
executeProcess('master.js', options,'forkWorkers', (output) => {
executeProcess('master.js', options, 'forkWorkers', (output) => {
Object.assign(result, output);

@@ -99,8 +98,8 @@ });

try {
let result = {};
executeProcess('master.js', {workers:4}, 'reloadWorkers', (output) => {
const result = {};
executeProcess('master.js', {workers: 4}, 'reloadWorkers', (output) => {
Object.assign(result, output);
});
await sleep(interval * 2);
console.log(result);
// console.log(result);
t.notDeepEqual(result.beforeWorkers, result.afterWorkers);

@@ -113,7 +112,7 @@ } catch (e) {

try {
let result = {};
let options = {
reloadSignal: 'SIGUSR2',
const result = {};
const options = {
reloadSignal: 'SIGUSR2'
};
let masterProcess = executeProcess('master.js', options,'forkWorkers', (output) => {
const masterProcess = executeProcess('master.js', options, 'forkWorkers', (output) => {
Object.assign(result, output);

@@ -126,3 +125,3 @@ });

exec(`KILL -SIGUSR2 ${masterProcess.pid}`,{shell:'/bin/sh'},(error, stdout, stderr)=>{
exec(`KILL -SIGUSR2 ${masterProcess.pid}`, {shell: '/bin/sh'}, (error, stdout, stderr) => {
console.log(`stdout: ${stdout}`);

@@ -135,3 +134,2 @@ console.log(`stderr: ${stderr}`);

await sleep(interval);
} catch (e) {

@@ -143,7 +141,7 @@ }

try {
let result = {};
let options = {
const result = {};
const options = {
workers: 1
};
executeProcess('worker.js', options,'unHandleRejection', (output) => {
executeProcess('worker.js', options, 'unHandleRejection', (output) => {
Object.assign(result, output);

@@ -160,7 +158,7 @@ console.log(result);

try {
let result = {};
let options = {
const result = {};
const options = {
workers: 1
};
executeProcess('worker.js', options,'unCaughtException', (output) => {
executeProcess('worker.js', options, 'unCaughtException', (output) => {
Object.assign(result, output);

@@ -173,2 +171,2 @@ console.log(result);

}
});
});

@@ -15,25 +15,25 @@ const test = require('ava');

function mockCluster(){
mock('cluster',{
workers:[],
fork(env={}){
function mockCluster() {
mock('cluster', {
workers: [],
fork(env = {}) {
let worker = {
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
once(evtName,cb){
this.on(evtName,cb)
if(evtName === 'listening') {
cb('test address')
once(evtName, cb) {
this.on(evtName, cb);
if (evtName === 'listening') {
cb('test address');
}
},
trigger(evtName,args){
trigger(evtName, args) {
const cluster = require('cluster');
if(evtName === 'exit'){
let workers = Array.from(cluster.workers);
cluster.workers.forEach((item,index)=>{
if(item === this){
workers.splice(index,1)
if (evtName === 'exit') {
const workers = Array.from(cluster.workers);
cluster.workers.forEach((item, index) => {
if (item === this) {
workers.splice(index, 1);
}
})
});
cluster.workers = workers;

@@ -43,13 +43,13 @@ }

},
send(signal){
send(signal) {
// console.log(signal);
},
kill(){
kill() {
// this.isKilled = true;
},
isConnected(){
isConnected() {
return !this.isKilled;
},
process:{
kill:()=>{
process: {
kill: () => {
worker.isKilled = true;

@@ -59,14 +59,14 @@ }

};
worker = Object.assign(worker,env);
let cluster = require('cluster');
cluster.workers.push(worker)
worker = Object.assign(worker, env);
const cluster = require('cluster');
cluster.workers.push(worker);
return worker;
},
on: () => {},
trigger(evtName,args){
trigger(evtName, args) {
this.workers.forEach(worker => {
worker.trigger(evtName,args)
})
worker.trigger(evtName, args);
});
}
})
});
}

@@ -84,7 +84,7 @@

const Master = getMaster();
let instance = new Master();
const instance = new Master();
await instance.forkWorkers();
cluster.trigger('message','think-graceful-disconnect');
cluster.trigger('message','test');
t.is(cluster.workers[0].hasGracefulReload,true);
cluster.trigger('message', 'think-graceful-disconnect');
cluster.trigger('message', 'test');
t.is(cluster.workers[0].hasGracefulReload, undefined);
});

@@ -96,5 +96,5 @@

const Master = getMaster();
let instance = new Master();
const instance = new Master();
await instance.forkWorkers();
t.is(cluster.workers.length,require('os').cpus().length)
t.is(cluster.workers.length, require('os').cpus().length);
});

@@ -106,7 +106,7 @@

const Master = getMaster();
let instance = new Master();
const instance = new Master();
await instance.forkWorkers();
cluster.trigger('message','think-graceful-disconnect');
cluster.trigger('message', 'think-graceful-disconnect');
cluster.trigger('exit');
t.is(cluster.workers.length,require('os').cpus().length)
t.is(cluster.workers.length, require('os').cpus().length);
});

@@ -118,5 +118,5 @@

const Master = getMaster();
let instance = new Master({reloadSignal:'SIGUSR2'});
const instance = new Master({reloadSignal: 'SIGUSR2'});
await instance.forkWorkers();
await process.kill(process.pid,'SIGUSR2');
await process.kill(process.pid, 'SIGUSR2');
});

@@ -128,6 +128,6 @@

const Master = getMaster();
let instance = new Master({enableAgent:true});
const instance = new Master({});
await instance.forkWorkers();
cluster.trigger('listening')
t.is(cluster.workers.length, require('os').cpus().length+1)
cluster.trigger('listening');
t.is(cluster.workers.length, require('os').cpus().length);
});

@@ -139,7 +139,7 @@

const Master = getMaster();
let instance = new Master({enableAgent:true});
const instance = new Master({});
await instance.forkWorkers();
await instance.killWorker(cluster.workers[0]);
await sleep(1000);
t.is(cluster.workers[0].isKilled,true);
t.is(cluster.workers[0].isKilled, true);
});

@@ -151,10 +151,10 @@

const Master = getMaster();
let instance = new Master({enableAgent:true});
const instance = new Master({});
await instance.forkWorkers();
await instance.killWorker(cluster.workers[0],true);
await instance.killWorker(cluster.workers[0], true);
await sleep(1000);
await instance.killWorker(cluster.workers[0]);
await sleep(1000);
t.is(cluster.workers[0].isKilled,true);
t.is(cluster.workers[0].hasGracefulReload,true);
t.is(cluster.workers[0].isKilled, true);
t.is(cluster.workers[0].hasGracefulReload, undefined);
});

@@ -166,3 +166,3 @@

const Master = getMaster();
let instance = new Master();
const instance = new Master();
await instance.forkWorkers();

@@ -176,3 +176,3 @@ instance.forceReloadWorkers();

const Master = getMaster();
let instance = new Master({enableAgent:true});
const instance = new Master({});
await instance.forkWorkers();

@@ -187,3 +187,3 @@ cluster.workers[0].state = 'disconnected';

const Master = getMaster();
let instance = new Master({enableAgent:true});
const instance = new Master({});
await instance.forkWorkers();

@@ -197,6 +197,6 @@ instance.forceReloadWorkers();

const Master = getMaster();
let instance = new Master({enableAgent:true});
const instance = new Master({});
await instance.forkWorkers();
cluster.workers = [];
instance.forceReloadWorkers();
});
});

@@ -5,2 +5,4 @@ const test = require('ava');

mockCluster(true);
function getMessenger() {

@@ -10,40 +12,39 @@ return mock.reRequire('../../lib/messenger');

class events {
once(evtName,cb){
once(evtName, cb) {
cb();
}
emit(){}
emit() {}
}
function mockEvents(){
mock('events',events)
function mockEvents() {
mock('events', events);
}
function mockCluster(isMaster){
mock('cluster',{
receiveSignal:false,
workers:[
{type:'app',isAgent:false,send(){require('cluster').receiveSignal = true}},
{type:'agent',isAgent:true,send(){require('cluster').receiveSignal = true}}
],
function mockCluster(isMaster) {
mock('cluster', {
receiveSignal: false,
workers: {
1: {type: 'one', isAgent: false, send() { require('cluster').receiveSignal = true }},
2: {type: 'all', isAgent: true, send() { require('cluster').receiveSignal = true }}
},
isMaster,
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
trigger(evtName,message,worker){
this[evtName](worker,message)
trigger(evtName, message, worker) {
this[evtName](worker, message);
}
})
});
}
function mockProcess() {
process.on = (evtName,cb)=>{
process.on = (evtName, cb) => {
process[evtName] = cb;
}
};
process.trigger = (evtName,args={}) => {
process['is'+evtName] = true;
process.trigger = (evtName, args = {}) => {
process['is' + evtName] = true;
process[evtName](args);
}
};
}

@@ -54,7 +55,7 @@

const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
let flag = false;
const fn = ()=>{
const fn = () => {
flag = true;

@@ -64,51 +65,51 @@ };

m.runInOne(fn);
await sleep(5000)
t.is(flag,true);
await sleep(5000);
t.is(flag, true);
});
test('runInOne case', async t => {
test('runInOne case 2', async t => {
mockEvents();
const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
let flag = false;
const flag = false;
m.runInOne();
await sleep(5000)
t.is(flag,false);
m.runInOne(() => {});
await sleep(5000);
t.is(flag, false);
});
test('broadcast case', async t => {
test('broadcast case 8', async t => {
mockEvents();
const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
m.broadcast(()=>{},'test');
m.broadcast(() => {}, 'test');
});
test('bindEvent case', async t => {
test('bindEvent case 7', async t => {
mockCluster(true);
const cluster = require('cluster');
const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
const message = {
act:'think-messenger',
target:'one'
}
cluster.trigger('message',message,{});
act: 'think-messenger',
target: 'one'
};
cluster.trigger('message', message, {});
});
test('bindEvent case', async t => {
test('bindEvent case 6', async t => {
mockCluster(true);
const cluster = require('cluster');
const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
const message = {
act:'test',
target:'one'
}
cluster.trigger('message',message,{});
act: 'test',
target: 'one'
};
cluster.trigger('message', message, {});
});
test('bindEvent case', async t => {
test('bindEvent case 5', async t => {
mockCluster(false);

@@ -119,3 +120,3 @@ mockProcess();

const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
m.bindEvent();

@@ -125,8 +126,8 @@ const message = {

target: 'one'
}
process.trigger('message', message)
t.is(process['ismessage'],true)
};
process.trigger('message', message);
t.is(process['ismessage'], true);
});
test('bindEvent case', async t => {
test('bindEvent case 4', async t => {
mockCluster(false);

@@ -137,3 +138,3 @@ mockProcess();

const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
m.bindEvent();

@@ -143,45 +144,44 @@ const message = {

target: 'one'
}
process.trigger('message', message)
t.is(process['ismessage'],true)
};
process.trigger('message', message);
t.is(process['ismessage'], true);
});
test('bindEvent case', async t => {
test('bindEvent case 2', async t => {
mockCluster(true);
const cluster = require('cluster');
const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
const message = {
act:'think-messenger',
target:'all'
}
cluster.trigger('message',message,{});
t.is(cluster.receiveSignal,true)
act: 'think-messenger',
target: 'all'
};
cluster.trigger('message', message, {});
t.is(cluster.receiveSignal, true);
});
test('bindEvent case', async t => {
test('bindEvent case 3', async t => {
mockCluster(true);
const cluster = require('cluster');
const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
const message = {
act:'think-messenger',
target:'app'
}
cluster.trigger('message',message,{});
t.is(cluster.receiveSignal,true)
act: 'think-messenger',
target: 'all'
};
cluster.trigger('message', message, {});
t.is(cluster.receiveSignal, true);
});
test('bindEvent case', async t => {
test('bindEvent case 3', async t => {
mockCluster(true);
const cluster = require('cluster');
const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
const message = {
act:'think-messenger',
target:'agent'
}
cluster.trigger('message',message,{});
t.is(cluster.receiveSignal,true)
act: 'think-messenger',
target: 'all'
};
cluster.trigger('message', message, {});
t.is(cluster.receiveSignal, true);
});

@@ -193,9 +193,8 @@

const Messenger = getMessenger();
let m = new Messenger();
const m = new Messenger();
const message = {
act:'think-messenger',
}
cluster.trigger('message',message,{});
t.is(cluster.receiveSignal,true)
act: 'think-messenger'
};
cluster.trigger('message', message, {});
t.is(cluster.receiveSignal, true);
});

@@ -14,36 +14,36 @@ const test = require('ava');

function mockCluster(){
mock('cluster',{
worker:{
send(){},
once(){},
on(evtName,cb){
function mockCluster() {
mock('cluster', {
worker: {
send() {},
once() {},
on(evtName, cb) {
this[evtName] = cb;
},
trigger(evtName,args){
this[evtName](args)
trigger(evtName, args) {
this[evtName](args);
},
disconnect(){}
disconnect() {}
},
workers:[],
fork(env={}){
workers: [],
fork(env = {}) {
let worker = {
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
once(evtName,cb){
this.on(evtName,cb)
if(evtName === 'listening') {
cb('test address')
once(evtName, cb) {
this.on(evtName, cb);
if (evtName === 'listening') {
cb('test address');
}
},
trigger(evtName,args){
trigger(evtName, args) {
const cluster = require('cluster');
if(evtName === 'exit'){
let workers = Array.from(cluster.workers);
cluster.workers.forEach((item,index)=>{
if(item === this){
workers.splice(index,1)
if (evtName === 'exit') {
const workers = Array.from(cluster.workers);
cluster.workers.forEach((item, index) => {
if (item === this) {
workers.splice(index, 1);
}
})
});
cluster.workers = workers;

@@ -53,13 +53,13 @@ }

},
send(signal){
send(signal) {
// console.log(signal);
},
kill(){
kill() {
// this.isKilled = true;
},
isConnected(){
isConnected() {
return !this.isKilled;
},
process:{
kill:()=>{
process: {
kill: () => {
worker.isKilled = true;

@@ -69,31 +69,31 @@ }

};
worker = Object.assign(worker,env);
let cluster = require('cluster');
cluster.workers.push(worker)
worker = Object.assign(worker, env);
const cluster = require('cluster');
cluster.workers.push(worker);
cluster.worker = worker;
return worker;
},
trigger(evtName,args){
trigger(evtName, args) {
this.workers.forEach(worker => {
worker.trigger(evtName,args)
})
worker.trigger(evtName, args);
});
}
})
});
}
function mockProcess() {
process.on = (evtName,cb)=>{
process.on = (evtName, cb) => {
process[evtName] = cb;
}
};
process.once = (evtName,cb)=>{
process.once = (evtName, cb) => {
process[evtName] = cb;
}
};
process.exit = ()=>{
process.isKilled = true
}
process.exit = () => {
process.isKilled = true;
};
process.trigger = (evtName,args={}) => {
process['is'+evtName] = true;
process.trigger = (evtName, args = {}) => {
process['is' + evtName] = true;
process[evtName](args);

@@ -106,6 +106,5 @@

// }
}
};
}
// const defaultConfig = {

@@ -129,6 +128,6 @@ // server:{

const config = {
server:{
address:'http://localhost:8080'
server: {
address: 'http://localhost:8080'
},
onUnhandledRejection:(e)=>{
onUnhandledRejection: (e) => {
unhandledRejectionDid = true;

@@ -139,25 +138,23 @@ }

const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.captureEvents();
const loudRejection = require('loud-rejection')
const loudRejection = require('loud-rejection');
loudRejection()
loudRejection();
let myp
let myp;
setTimeout(function () {
myp = new Promise(function (resolve, reject) {
setTimeout(reject, 100, new Error('Silence me'))
})
}, 100)
setTimeout(function () {
myp.catch(function (err) {
t.is(unhandledRejectionDid,true)
})
}, 300)
setTimeout(function() {
myp = new Promise(function(resolve, reject) {
setTimeout(reject, 100, new Error('Silence me'));
});
}, 100);
setTimeout(function() {
myp.catch(function(err) {
t.is(unhandledRejectionDid, true);
});
}, 300);
await sleep(2000)
await sleep(2000);
});

@@ -169,21 +166,21 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
onUnhandledRejection:(e)=>{
onUnhandledRejection: (e) => {
unhandledRejectionDid = true;

@@ -195,14 +192,12 @@ }

const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.server = config.server;
cluster.fork();
instance.disconnectWorker(true);
cluster.trigger('message','think-graceful-fork');
config.server.trigger('request')
cluster.trigger('message', 'think-graceful-fork');
config.server.trigger('request');
t.is(config.server.res.Connection,'close');
t.is(config.server.res.Connection, 'close');
});
test.serial('normal case 3', async t => {

@@ -213,24 +208,24 @@ mockProcess();

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
onUnhandledRejection:(e)=>{
onUnhandledRejection: (e) => {
unhandledRejectionDid = true;
},
processKillTimeout:null
processKillTimeout: null
};

@@ -240,3 +235,3 @@

const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.server = config.server;

@@ -251,21 +246,21 @@ cluster.fork();

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
onUnhandledRejection:(e)=>{
onUnhandledRejection: (e) => {
unhandledRejectionDid = true;

@@ -277,6 +272,6 @@ }

const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
cluster.fork();
process.env.THINK_WORKERS = 1;
t.is(+instance.getWorkers(),1)
t.is(+instance.getWorkers(), 1);
});

@@ -290,32 +285,31 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
onUnhandledRejection:(e)=>{
onUnhandledRejection: (e) => {
unhandledRejectionDid = true;
},
}
};
const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.captureEvents();
process.trigger('uncaughtException');
t.is(process['isuncaughtException'],true)
t.is(process['isuncaughtException'], true);
});

@@ -327,32 +321,32 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
onUnhandledRejection:(e)=>{
onUnhandledRejection: (e) => {
unhandledRejectionDid = true;
},
debug:true
debug: true
};
const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.captureEvents();
process.trigger('uncaughtException');
t.is(process['isuncaughtException'],true)
t.is(process['isuncaughtException'], true);
});

@@ -364,32 +358,32 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
onUnhandledRejection:(e)=>{
onUnhandledRejection: (e) => {
unhandledRejectionDid = true;
},
}
};
const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.captureEvents();
process.trigger('message','think-reload-signal');
process.trigger('message','something');
t.is(process['ismessage'],true)
process.trigger('message', 'think-reload-signal');
process.trigger('message', 'something');
t.is(process['ismessage'], true);
});

@@ -401,24 +395,24 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
}
};
const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.server = config.server;

@@ -428,5 +422,5 @@

await sleep(1000)
await sleep(1000);
t.is(process.isKilled,undefined)
t.is(process.isKilled, undefined);
});

@@ -438,25 +432,25 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
processKillTimeout:50
processKillTimeout: 50
};
const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.server = config.server;

@@ -466,5 +460,5 @@

await sleep(1000)
await sleep(1000);
t.is(process.isKilled,true)
t.is(process.isKilled, true);
});

@@ -476,25 +470,25 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(){
close() {
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
processKillTimeout:50
processKillTimeout: 50
};
const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.server = config.server;

@@ -504,5 +498,5 @@

await sleep(1000)
await sleep(1000);
t.is(process.isKilled,true)
t.is(process.isKilled, true);
});

@@ -514,21 +508,21 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(cb){
cb()
close(cb) {
cb();
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
processKillTimeout:null
processKillTimeout: null
};

@@ -538,3 +532,3 @@

const Worker = getWorker();
let instance = new Worker(config);
const instance = new Worker(config);
instance.server = config.server;

@@ -544,5 +538,4 @@

cluster.worker.trigger('disconnect')
t.is(process.isKilled,true)
cluster.worker.trigger('disconnect');
t.is(process.isKilled, true);
});

@@ -554,21 +547,21 @@

const config = {
server:{
address:'http://localhost:8080',
req : {},
res : {
setHeader(key,value){
server: {
address: 'http://localhost:8080',
req: {},
res: {
setHeader(key, value) {
this[key] = value;
}
},
on(evtName,cb){
on(evtName, cb) {
this[evtName] = cb;
},
close(cb){
cb()
close(cb) {
cb();
},
trigger(eveName){
this[eveName](this.req,this.res);
trigger(eveName) {
this[eveName](this.req, this.res);
}
},
disableKeepAlive:true
disableKeepAlive: true
};

@@ -578,4 +571,4 @@

const Worker = getWorker();
let instance = new Worker(config);
instance.captureEvents()
});
const instance = new Worker(config);
instance.captureEvents();
});
const cluster = require('cluster');
const http = require('http');
let ClusterMaster = require('../../index').Master;
const ClusterMaster = require('../../index').Master;
const sleep = time => new Promise(resolve => setTimeout(resolve, time));
let opt = Object.assign({}, JSON.parse(process.argv[3]));
let functionName = process.argv[2];
const opt = Object.assign({}, JSON.parse(process.argv[3]));
const functionName = process.argv[2];
function eachWorker(callback) {
for (let id in cluster.workers) {
for (const id in cluster.workers) {
callback(cluster.workers[id]);

@@ -14,13 +14,13 @@ }

let app = {
const app = {
forkWorkers: (options) => {
try {
if (cluster.isMaster) {
let instance = new ClusterMaster(options);
const instance = new ClusterMaster(options);
instance.forkWorkers().then(() => {
let workers = [];
const workers = [];
eachWorker((worker) => {
workers.push(worker.process.pid);
});
let result = {
const result = {
options,

@@ -37,3 +37,2 @@ workers,

process.send({cmd: 'notifyRequest'});
}).listen(8000);

@@ -49,5 +48,5 @@ }

if (cluster.isMaster) {
let instance = new ClusterMaster(options);
const instance = new ClusterMaster(options);
instance.forkWorkers().then(() => {
let beforeWorkers = [];
const beforeWorkers = [];
eachWorker((worker) => {

@@ -62,8 +61,8 @@ beforeWorkers.push(worker.process.pid);

// console.log(JSON.stringify(result));
sleep(2000).then(()=>{
sleep(2000).then(() => {
instance.forceReloadWorkers();
})
});
});
sleep(5000).then(() => {
let workers = [];
const workers = [];
eachWorker((worker) => {

@@ -75,3 +74,2 @@ workers.push(worker.process.pid);

});
} else {

@@ -82,3 +80,2 @@ http.Server((req, res) => {

process.send({cmd: 'notifyRequest'});
}).listen(8000);

@@ -85,0 +82,0 @@ }

const cluster = require('cluster');
const http = require('http');
let ClusterMaster = require('../../index').Master;
let ClusterWorker = require('../../index').Worker;
const ClusterMaster = require('../../index').Master;
const ClusterWorker = require('../../index').Worker;
let mockServer = {
on:(evtName)=>{
const mockServer = {
on: (evtName) => {
console.log(evtName);

@@ -12,19 +12,17 @@ }

const defaultOption = {
onUncaughtException:(err)=>{
onUncaughtException: (err) => {
console.log('onUncaughtException');
},
onUnhandledRejection:(err)=>{
onUnhandledRejection: (err) => {
console.log('onUnhandledRejection');
},
server:mockServer
server: mockServer
};
const sleep = time => new Promise(resolve => setTimeout(resolve, time));
let opt = Object.assign({}, defaultOption,JSON.parse(process.argv[3]));
let functionName = process.argv[2];
const opt = Object.assign({}, defaultOption, JSON.parse(process.argv[3]));
const functionName = process.argv[2];
function eachWorker(callback) {
for (let id in cluster.workers) {
for (const id in cluster.workers) {
callback(cluster.workers[id]);

@@ -34,13 +32,13 @@ }

let app = {
const app = {
unHandleRejection: (options) => {
try {
if (cluster.isMaster) {
let instance = new ClusterMaster(options);
const instance = new ClusterMaster(options);
instance.forkWorkers().then(() => {
let workers = [];
const workers = [];
eachWorker((worker) => {
workers.push(worker.process.pid);
});
let result = {
const result = {
options,

@@ -53,5 +51,5 @@ workers,

} else {
let workerInstance = new ClusterWorker(options);
const workerInstance = new ClusterWorker(options);
workerInstance.captureEvents();
sleep(3000).then(()=>{
sleep(3000).then(() => {
xxx();

@@ -69,12 +67,12 @@ });

},
unCaughtException:(options)=>{
unCaughtException: (options) => {
try {
if (cluster.isMaster) {
let instance = new ClusterMaster(options);
const instance = new ClusterMaster(options);
instance.forkWorkers().then(() => {
let workers = [];
const workers = [];
eachWorker((worker) => {
workers.push(worker.process.pid);
});
let result = {
const result = {
options,

@@ -87,7 +85,7 @@ workers,

} else {
let workerInstance = new ClusterWorker(options);
const workerInstance = new ClusterWorker(options);
workerInstance.captureEvents();
setTimeout(()=>{
setTimeout(() => {
xxx();
},3000)
}, 3000);
http.Server((req, res) => {

@@ -94,0 +92,0 @@ res.writeHead(200);

@@ -13,3 +13,2 @@ const test = require('ava');

let masterProcess = null;

@@ -23,11 +22,11 @@ test.afterEach.always(() => {

function executeProcess(fileName, options, funcName, callback) {
let scriptPath = path.join(__dirname, '../script', fileName);
masterProcess = spawn(`node`, [scriptPath, funcName , JSON.stringify(options)]);
const scriptPath = path.join(__dirname, '../script', fileName);
masterProcess = spawn(`node`, [scriptPath, funcName, JSON.stringify(options)]);
masterProcess.stdout.on('data', (buf) => {
try{
let json = JSON.parse(buf.toString('utf-8'));
try {
const json = JSON.parse(buf.toString('utf-8'));
callback(json);
}catch (e){
callback({message:buf.toString('utf-8')});
} catch (e) {
callback({message: buf.toString('utf-8')});
}

@@ -40,9 +39,9 @@ });

test.serial('normal case', async t => {
console.log('worker')
console.log('worker');
try {
let result = {};
let options = {
const result = {};
const options = {
workers: 4
};
executeProcess('worker.js', options,'forkWorkers', (output) => {
executeProcess('worker.js', options, 'forkWorkers', (output) => {
Object.assign(result, output);

@@ -56,2 +55,1 @@ });

});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc