think-cluster
Advanced tools
Comparing version 1.0.8 to 1.0.9
const util = require('./lib/util.js'); | ||
const messenger = require('./lib/messenger.js'); | ||
const Messenger = require('./lib/messenger.js'); | ||
@@ -9,4 +9,4 @@ exports.Worker = require('./lib/worker.js'); | ||
exports.messenger = new messenger(); | ||
exports.messenger = new Messenger(); | ||
exports.isAgent = util.isAgent; | ||
exports.isFirstWorker = util.isFirstWorker; | ||
exports.isFirstWorker = util.isFirstWorker; |
const cluster = require('cluster'); | ||
const util = require('./util.js'); | ||
const net = require('net'); | ||
const stringHash = require('string-hash'); | ||
@@ -7,2 +9,14 @@ let waitReloadWorkerTimes = 0; | ||
/** | ||
* default options | ||
*/ | ||
const defaultOptions = { | ||
port: 0, // server listen port | ||
host: '', // server listen host | ||
sticky: false, // sticky cluster | ||
getRemoteAddress: socket => socket.remoteAddress, | ||
workers: 0, // fork worker nums | ||
reloadSignal: '' // reload workers signal | ||
}; | ||
/** | ||
* Master class | ||
@@ -16,3 +30,4 @@ */ | ||
constructor(options) { | ||
this.options = util.parseOptions(options); | ||
options = util.parseOptions(options); | ||
this.options = Object.assign({}, defaultOptions, options); | ||
} | ||
@@ -27,14 +42,12 @@ /** | ||
const worker = cluster.workers[id]; | ||
if (!this.isAliveWorker(worker)) continue; | ||
if (!this.isAliveWorker(worker) || util.isAgent(worker)) continue; | ||
worker.send(util.THINK_RELOAD_SIGNAL); | ||
} | ||
}; | ||
if (signal) { | ||
process.on(signal, reloadWorkers); | ||
} | ||
if (signal) process.on(signal, reloadWorkers); | ||
// if receive message `think-cluster-reload-workers` from worker, restart all workers | ||
cluster.on('message', (worker, message) => { | ||
if (message === 'think-cluster-reload-workers') { | ||
reloadWorkers(); | ||
} | ||
if (message !== 'think-cluster-reload-workers') return; | ||
reloadWorkers(); | ||
}); | ||
@@ -47,3 +60,4 @@ } | ||
isAliveWorker(worker) { | ||
if (worker.state === 'disconnected' || worker.needKilled) { | ||
const state = worker.state; | ||
if (state === 'disconnected' || state === 'dead' || worker.needKilled) { | ||
return false; | ||
@@ -109,3 +123,3 @@ } | ||
/** | ||
* force reload all workers, in development env | ||
* force reload all workers when code is changed, in development env | ||
*/ | ||
@@ -126,5 +140,10 @@ forceReloadWorkers() { | ||
if (!aliveWorkers.length) return; | ||
if (aliveWorkers.length > this.options.workers) { | ||
// check alive workers has leak | ||
let allowWorkers = this.options.workers; | ||
if (this.options.enableAgent) allowWorkers++; | ||
if (aliveWorkers.length > allowWorkers) { | ||
console.error(`workers fork has leak, alive workers: ${aliveWorkers.length}, need workers: ${this.options.workers}`); | ||
} | ||
const firstWorker = aliveWorkers.shift(); | ||
@@ -148,4 +167,37 @@ const promise = util.forkWorker(this.getForkEnv()).then(() => { | ||
} | ||
/** | ||
* create server with sticky | ||
* https://github.com/uqee/sticky-cluster | ||
*/ | ||
createServer() { | ||
const deferred = think.defer(); | ||
const server = net.createServer({pauseOnConnect: true}, socket => { | ||
const remoteAddress = this.options.getRemoteAddress(socket) || ''; | ||
const index = stringHash(remoteAddress) % this.options.workers; | ||
let idx = -1; | ||
for (const id in cluster.workers) { | ||
const worker = cluster.workers[id]; | ||
if (!this.isAliveWorker(worker) || util.isAgent(worker)) continue; | ||
if (index === ++idx) { | ||
worker.send(util.THINK_STICKY_CLUSTER, socket); | ||
break; | ||
} | ||
} | ||
}); | ||
server.listen(this.options.port, this.options.host, () => { | ||
this.forkWorkers().then(() => { | ||
deferred.resolve(); | ||
}); | ||
}); | ||
return deferred.promise; | ||
} | ||
/** | ||
* start server, support sticky | ||
*/ | ||
startServer() { | ||
if (!this.options.sticky) return this.forkWorkers(); | ||
return this.createServer(); | ||
} | ||
} | ||
module.exports = Master; |
@@ -15,2 +15,3 @@ const cluster = require('cluster'); | ||
exports.THINK_AGENT_CLOSED = 'think-agent-closed'; | ||
exports.THINK_STICKY_CLUSTER = 'think-sticky-cluster'; | ||
@@ -17,0 +18,0 @@ exports.PIN = 'PIN'; |
const util = require('./util.js'); | ||
const cluster = require('cluster'); | ||
const assert = require('assert'); | ||
const helper = require('think-helper'); | ||
const AgentClient = require('./agent_client.js'); | ||
@@ -12,7 +12,9 @@ | ||
const defaultOptions = { | ||
debug: false, | ||
logger: console.error.bind(console), | ||
disableKeepAlive: false, // disabled connect keep alive | ||
onUncaughtException: () => {}, // onUncaughtException event handle | ||
onUnhandledRejection: () => {}, // onUnhandledRejection event handle | ||
port: 0, | ||
host: '', | ||
sticky: false, | ||
createServer: () => {}, | ||
logger: () => {}, | ||
onUncaughtException: () => false, // onUncaughtException event handle | ||
onUnhandledRejection: () => false, // onUnhandledRejection event handle | ||
processKillTimeout: 10 * 1000 // 10s | ||
@@ -108,6 +110,7 @@ }; | ||
errTimes++; | ||
this.options.onUncaughtException(err); | ||
this.options.logger(`uncaughtException, times: ${errTimes}, pid: ${process.pid}`); | ||
this.options.logger(err.stack); | ||
if (errTimes === 1 && !this.options.debug) { | ||
this.options.logger(err); | ||
const flag = this.options.onUncaughtException(err); | ||
if (errTimes === 1 && flag) { | ||
this.disconnectWorker(true); | ||
@@ -124,16 +127,42 @@ } | ||
rejectTimes++; | ||
this.options.onUnhandledRejection(err); | ||
this.options.logger(`unhandledRejection, times: ${rejectTimes}, pid: ${process.pid}`); | ||
this.options.logger(err); | ||
const flag = this.options.onUnhandledRejection(err); | ||
if (rejectTimes === 1 && flag) { | ||
this.disconnectWorker(true); | ||
} | ||
}); | ||
} | ||
/** | ||
* listen port | ||
*/ | ||
listen() { | ||
const deferred = helper.defer(); | ||
this.server = this.options.createServer(); | ||
if (!this.options.sticky) { | ||
this.server.listen(this.options.port, this.options.host, () => { | ||
deferred.resolve(); | ||
}); | ||
} else { | ||
process.on('message', (message, socket) => { | ||
if (message !== util.THINK_STICKY_CLUSTER) return; | ||
// emulate a connection event on the server by emitting the | ||
// event with the connection master sent to us | ||
this.server.emit('connection', socket); | ||
// resume as we already catched the conn | ||
socket.resume(); | ||
}); | ||
// start on random port, accept conn from this host only | ||
this.server.listen(0, '127.0.0.1', () => { | ||
deferred.resolve(); | ||
}); | ||
} | ||
return deferred.promise; | ||
} | ||
/** | ||
* capture events | ||
*/ | ||
captureEvents() { | ||
assert(this.options.server, 'options.server required'); | ||
this.uncaughtException(); | ||
this.unhandledRejection(); | ||
if (this.options.disableKeepAlive) { | ||
this.disableKeepAlive(); | ||
} | ||
this.captureReloadSignal(); | ||
@@ -143,2 +172,9 @@ AgentClient.getInstance(); | ||
/** | ||
* start server | ||
*/ | ||
startServer() { | ||
this.captureEvents(); | ||
return this.listen(); | ||
} | ||
/** | ||
* get workers | ||
@@ -145,0 +181,0 @@ */ |
{ | ||
"name": "think-cluster", | ||
"description": "Cluster manage for ThinkJS", | ||
"version": "1.0.8", | ||
"version": "1.0.9", | ||
"author": { | ||
@@ -23,2 +23,4 @@ "name": "welefen", | ||
"dependencies": { | ||
"debug": "^2.6.8", | ||
"string-hash": "^1.1.3", | ||
"think-helper": "^1.0.0" | ||
@@ -29,3 +31,3 @@ }, | ||
"eslint": "^4.2.0", | ||
"eslint-config-eslint": "^4.0.0", | ||
"eslint-config-think": "^1.0.2", | ||
"mock-require": "^2.0.1", | ||
@@ -32,0 +34,0 @@ "nyc": "^7.0.0" |
@@ -16,3 +16,5 @@ const test = require('ava'); | ||
test.serial('normal case', async t => { | ||
process.env.THINK_ENABLE_AGENT = false; | ||
const cls = delegate(MyClass); | ||
@@ -28,3 +30,2 @@ let instance = new cls(); | ||
const cls = delegate(MyClass); | ||
let obj = new cls(); | ||
t.is(cls.name,'MyClass') | ||
@@ -36,6 +37,2 @@ }); | ||
// | ||
// constructor(){ | ||
// console.log('myclass constructor') | ||
// } | ||
// | ||
// static delegateMethods(){ | ||
@@ -45,9 +42,7 @@ // return ['get','set'] | ||
// | ||
// get(name){ | ||
// console.log('get') | ||
// } | ||
// get(name){} | ||
// | ||
// set(name,value){} | ||
// } | ||
// | ||
// function delegate(cls){ | ||
@@ -58,3 +53,2 @@ // let delegateCls = class delegateCls extends cls {}; | ||
// delegateCls.prototype.constructor = function(){ | ||
// console.log('constructor') | ||
// cArgs = arguments; | ||
@@ -74,21 +68,4 @@ // delegateCls.apply(this, arguments); | ||
// | ||
// let delegateCls = delegate(MyClass); | ||
// let d = delegate(MyClass); | ||
// | ||
// const ins = new delegateCls(); | ||
// | ||
// ins.get(); | ||
// class childClass extends MyClass{ | ||
// constructor(){ | ||
// console.log('child constructor') | ||
// } | ||
// } | ||
// | ||
// new childClass(); | ||
// console.log(new d()) |
@@ -62,2 +62,3 @@ const test = require('ava'); | ||
}, | ||
on: () => {}, | ||
trigger(evtName,args){ | ||
@@ -124,3 +125,3 @@ this.workers.forEach(worker => { | ||
cluster.trigger('listening') | ||
t.is(cluster.workers.length,require('os').cpus().length+1) | ||
t.is(cluster.workers.length, require('os').cpus().length+1) | ||
}); | ||
@@ -127,0 +128,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
57241
2096
3
+ Addeddebug@^2.6.8
+ Addedstring-hash@^1.1.3
+ Addeddebug@2.6.9(transitive)
+ Addedms@2.0.0(transitive)
+ Addedstring-hash@1.1.3(transitive)