process-custodian
Advanced tools
Comparing version 0.2.2 to 0.3.0
@@ -11,3 +11,4 @@ 'use strict'; | ||
I_AM_MASTER: 'IAmNewMaster', | ||
I_AM_SLAVE: 'IAmSlave' | ||
I_AM_SLAVE: 'IAmSlave', | ||
STOP: 'Stop' | ||
}; |
@@ -19,2 +19,6 @@ 'use strict'; | ||
var _typeof2 = require('babel-runtime/helpers/typeof'); | ||
var _typeof3 = _interopRequireDefault(_typeof2); | ||
var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck'); | ||
@@ -24,2 +28,6 @@ | ||
var _createClass2 = require('babel-runtime/helpers/createClass'); | ||
var _createClass3 = _interopRequireDefault(_createClass2); | ||
var ensureIndexExist = function () { | ||
@@ -73,2 +81,3 @@ var _ref2 = (0, _asyncToGenerator3.default)( /*#__PURE__*/_regenerator2.default.mark(function _callee(tickTimeInSeconds) { | ||
processStartAt: processStartAt, | ||
eventLoopLag: this.getEventLoopLag(), | ||
hostName: hostName, | ||
@@ -123,7 +132,7 @@ pid: pid | ||
_id: _constants.MASTER_KEY, | ||
FINGERPRINT: _fingerprint.FINGERPRINT, | ||
fingerprint: _fingerprint.FINGERPRINT, | ||
lastActivity: { $gte: renewDate } | ||
}, { | ||
$set: { | ||
FINGERPRINT: _fingerprint.FINGERPRINT, | ||
fingerprint: _fingerprint.FINGERPRINT, | ||
lastActivity: new Date() | ||
@@ -174,2 +183,3 @@ } | ||
_id: _constants.MASTER_KEY, | ||
fingerprint: _fingerprint.FINGERPRINT, | ||
lastActivity: new Date() | ||
@@ -204,3 +214,3 @@ } | ||
$set: { | ||
FINGERPRINT: _fingerprint.FINGERPRINT, | ||
fingerprint: _fingerprint.FINGERPRINT, | ||
lastActivity: new Date() | ||
@@ -262,54 +272,164 @@ } | ||
var ProcessCustodian = function ProcessCustodian(_ref) { | ||
var _this = this; | ||
var ProcessCustodian = function () { | ||
function ProcessCustodian(_ref) { | ||
var _this = this; | ||
var rawCollection = _ref.rawCollection, | ||
_ref$tickTimeInSecond = _ref.tickTimeInSeconds, | ||
tickTimeInSeconds = _ref$tickTimeInSecond === undefined ? 60 : _ref$tickTimeInSecond, | ||
_ref$marginTimeForRen = _ref.marginTimeForRenew, | ||
marginTimeForRenew = _ref$marginTimeForRen === undefined ? 10 : _ref$marginTimeForRen; | ||
(0, _classCallCheck3.default)(this, ProcessCustodian); | ||
this._isMaster = false; | ||
this._emitter = null; | ||
this._timeout = null; | ||
this._stop = null; | ||
var rawCollection = _ref.rawCollection, | ||
_ref$tickTimeInSecond = _ref.tickTimeInSeconds, | ||
tickTimeInSeconds = _ref$tickTimeInSecond === undefined ? 45 : _ref$tickTimeInSecond, | ||
_ref$marginTimeForRen = _ref.marginTimeForRenew, | ||
marginTimeForRenew = _ref$marginTimeForRen === undefined ? 15 : _ref$marginTimeForRen, | ||
_ref$standardHighLeve = _ref.standardHighLevel, | ||
standardHighLevel = _ref$standardHighLeve === undefined ? 100 : _ref$standardHighLeve; | ||
(0, _classCallCheck3.default)(this, ProcessCustodian); | ||
this._standardHighLevel = 99; | ||
this._smoothingFactor = 1 / 3; | ||
this._isMaster = false; | ||
this._emitter = null; | ||
this._timeout = null; | ||
this._currentLag = 0; | ||
this._expectedFiredTime = Infinity; | ||
this._stop = null; | ||
this.onTick = noop; | ||
this.onIAmNewMaster = noop; | ||
this.onIAmSlave = noop; | ||
this.onceTick = noop; | ||
this.onceIAmNewMaster = noop; | ||
this.onceIAmSlave = noop; | ||
this.isMaster = function () { | ||
return _this._isMaster; | ||
}; | ||
this.isMaster = function () { | ||
return _this._isMaster; | ||
}; | ||
this.stop = function () { | ||
if (_this._stop) { | ||
_this._stop(); | ||
_this._collection.deleteOne({ _id: _fingerprint.FINGERPRINT }); | ||
_this._collection.deleteOne({ id: _constants.MASTER_KEY, FINGERPRINT: _fingerprint.FINGERPRINT }); | ||
_this._stop = null; | ||
this.stop = function () { | ||
if (_this._stop) { | ||
_this._stop(); | ||
_this._collection.deleteOne({ _id: _fingerprint.FINGERPRINT }); | ||
_this._collection.deleteOne({ id: _constants.MASTER_KEY, FINGERPRINT: _fingerprint.FINGERPRINT }); | ||
_this._stop = null; | ||
_this._emitter.emit(_constants.EVENTS.STOP); | ||
} | ||
}; | ||
if (!rawCollection) { | ||
throw new Error('Missing Collection in constructor of ProcessCustodian'); | ||
} | ||
}; | ||
if ((typeof Meteor === 'undefined' ? 'undefined' : (0, _typeof3.default)(Meteor)) === 'object' && Meteor.bindEnvironment) { | ||
this._bindEnvironment = Meteor.bindEnvironment; | ||
} | ||
this._standardHighLevel = standardHighLevel; | ||
this._emitter = new _events2.default(); | ||
this._collection = rawCollection; | ||
ensureIndexExist.call(this, tickTimeInSeconds); | ||
this._stop = runActivityQueue.call(this, tickTimeInSeconds, marginTimeForRenew, true); | ||
if (!rawCollection) { | ||
throw new Error('Missing Collection in constructor of ProcessCustodian'); | ||
// preparing methods onTick, onIAmNewMaster, onIAmSlave | ||
(0, _values2.default)(_constants.EVENTS).forEach(function (key) { | ||
return _this['on' + key] = function (fn) { | ||
if (_this._bindEnvironment) { | ||
fn = _this._bindEnvironment(fn); | ||
} | ||
_this._emitter.on(key, fn); | ||
return function () { | ||
return _this._emitter.removeListener(key, fn); | ||
}; | ||
}; | ||
}); | ||
// once | ||
(0, _values2.default)(_constants.EVENTS).forEach(function (key) { | ||
return _this['once' + key] = function (fn) { | ||
if (_this._bindEnvironment) { | ||
fn = _this._bindEnvironment(fn); | ||
} | ||
_this._emitter.once(key, fn); | ||
}; | ||
}); | ||
} | ||
this._emitter = new _events2.default(); | ||
this._collection = rawCollection; | ||
ensureIndexExist.call(this, tickTimeInSeconds); | ||
this._stop = runActivityQueue.call(this, tickTimeInSeconds, marginTimeForRenew, true); | ||
// preparing methods onTick, onIAmNewMaster, onIAmSlave | ||
(0, _values2.default)(_constants.EVENTS).forEach(function (key) { | ||
return _this['on' + key] = function (fn) { | ||
_this._emitter.on(key, fn); | ||
return function () { | ||
return _this._emitter.removeListener(key, fn); | ||
}; | ||
}; | ||
}); | ||
// once | ||
(0, _values2.default)(_constants.EVENTS).forEach(function (key) { | ||
return _this['once' + key] = function (fn) { | ||
_this._emitter.once(key, fn); | ||
}; | ||
}); | ||
}; | ||
/** | ||
* fired on every checking of the event loop | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
/** | ||
* fired every time when process will being a master | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
/** | ||
* runs every time when process is going back to be a slave | ||
* @param {function} - event handler | ||
* @returns {function} - stop listening handler | ||
*/ | ||
/** | ||
* Runs the passed function on next checking of the event loop | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
/** | ||
* fired once when process being be a master | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
/** | ||
* fired one time when process will lost master status | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
/** | ||
* Is this master process? | ||
* @returns {boolean} | ||
*/ | ||
(0, _createClass3.default)(ProcessCustodian, [{ | ||
key: 'getEventLoopLag', | ||
/** | ||
* lag value from last sampling of event loop | ||
* @returns {number} | ||
*/ | ||
value: function getEventLoopLag() { | ||
return Math.round(this._currentLag); | ||
} | ||
/** | ||
* Checks if the event loop lag is on too high level. | ||
* Using this information you can put away of computing new job, | ||
* or even stop the request processing early then process will freeze | ||
*/ | ||
}, { | ||
key: 'isOverloaded', | ||
value: function isOverloaded() { | ||
return this._currentLag > this._standardHighLevel; | ||
} | ||
/** | ||
* Uniq fingerprint of process | ||
*/ | ||
}, { | ||
key: 'getFingerprint', | ||
value: function getFingerprint() { | ||
return _fingerprint.FINGERPRINT; | ||
} | ||
/** | ||
* stops sampling loop | ||
*/ | ||
}]); | ||
return ProcessCustodian; | ||
}(); | ||
exports.default = ProcessCustodian; | ||
@@ -336,3 +456,3 @@ | ||
var _ref6 = (0, _asyncToGenerator3.default)( /*#__PURE__*/_regenerator2.default.mark(function _callee5() { | ||
var wasMaster; | ||
var wasMaster, lag; | ||
return _regenerator2.default.wrap(function _callee5$(_context5) { | ||
@@ -343,41 +463,52 @@ while (1) { | ||
wasMaster = _this2._isMaster; | ||
_context5.prev = 1; | ||
lag = Math.max(0, Date.now() - _this2._expectedFiredTime); | ||
// @see https://en.wikipedia.org/wiki/Exponential_smoothing | ||
// we weigh the current value against the previous value 3:1 to smooth bounds. | ||
_this2._currentLag = _this2._smoothingFactor * lag + (1 - _this2._smoothingFactor) * _this2._currentLag; | ||
_context5.prev = 3; | ||
if (!wasMaster) { | ||
_context5.next = 8; | ||
_context5.next = 10; | ||
break; | ||
} | ||
_context5.next = 5; | ||
_context5.next = 7; | ||
return renewingMasterReservation.call(_this2, tickTimeInSeconds, marginTimeForRenew); | ||
case 5: | ||
case 7: | ||
_this2._isMaster = _context5.sent; | ||
_context5.next = 11; | ||
_context5.next = 14; | ||
break; | ||
case 8: | ||
_context5.next = 10; | ||
case 10: | ||
if (_this2.isOverloaded()) { | ||
_context5.next = 14; | ||
break; | ||
} | ||
_context5.next = 13; | ||
return tryBeMaster.call(_this2, tickTimeInSeconds, marginTimeForRenew, isInit); | ||
case 10: | ||
case 13: | ||
_this2._isMaster = _context5.sent; | ||
case 11: | ||
_context5.next = 13; | ||
case 14: | ||
_context5.next = 16; | ||
return oneHeartbeat.call(_this2, tickTimeInSeconds); | ||
case 13: | ||
_context5.next = 18; | ||
case 16: | ||
_context5.next = 21; | ||
break; | ||
case 15: | ||
_context5.prev = 15; | ||
_context5.t0 = _context5['catch'](1); | ||
case 18: | ||
_context5.prev = 18; | ||
_context5.t0 = _context5['catch'](3); | ||
console.error('ActivityQueue:', _context5.t0); | ||
case 18: | ||
_context5.prev = 18; | ||
case 21: | ||
_context5.prev = 21; | ||
_this2._expectedFiredTime = Date.now() + tickTimeInSeconds * 1000; | ||
runActivityQueue.call(_this2, tickTimeInSeconds, marginTimeForRenew); | ||
@@ -391,5 +522,5 @@ _this2._emitter.emit(_constants.EVENTS.TICK); | ||
} | ||
return _context5.finish(18); | ||
return _context5.finish(21); | ||
case 24: | ||
case 28: | ||
case 'end': | ||
@@ -399,3 +530,3 @@ return _context5.stop(); | ||
} | ||
}, _callee5, _this2, [[1, 15, 18, 24]]); | ||
}, _callee5, _this2, [[3, 18, 21, 28]]); | ||
})); | ||
@@ -411,4 +542,10 @@ | ||
this._timeout = setTimeout(doTick, tickTimeInSeconds * 1000); | ||
// unref-ing the timeout, so to not be the only action that would hold the node-process open. | ||
this._timeout.unref && this._timeout.unref(); | ||
} | ||
return _stop.bind(this); | ||
} | ||
function noop() { | ||
/* noop */ | ||
} |
{ | ||
"name": "process-custodian", | ||
"version": "0.2.2", | ||
"version": "0.3.0", | ||
"description": "This package helps with organizing of tasks between few instances of same app, It can identify processes and track them activity.", | ||
@@ -42,3 +42,4 @@ "main": "index.js", | ||
"nodemon": "1.7.x", | ||
"pre-commit": "^1.1.3" | ||
"pre-commit": "^1.1.3", | ||
"sinon": "^3.2.1" | ||
}, | ||
@@ -45,0 +46,0 @@ "dependencies": { |
@@ -49,2 +49,6 @@ # Process Custodian | ||
}); | ||
const lag = handle.getEventLoopLag(); | ||
const stopSomething = handle.isOverloaded(); | ||
@@ -51,0 +55,0 @@ ``` |
@@ -0,8 +1,67 @@ | ||
import {MASTER_KEY, EVENTS} from '../lib/constants'; | ||
import ProcessCustodian from '../lib/ProcessCustodian'; | ||
import {mockCollection} from './mock'; | ||
import {mock, useFakeTimers} from 'sinon'; | ||
import {expect} from 'chai'; | ||
const {describe, it} = global; | ||
const tickTimeInSeconds = 10; | ||
const clock = useFakeTimers(); | ||
const porocessCustodian = new ProcessCustodian({ | ||
rawCollection: mockCollection(), | ||
tickTimeInSeconds, | ||
marginTimeForRenew: 2 | ||
}); | ||
describe('test 1', () => { | ||
it('case 1', () => { | ||
expect('ala').to.be.typeof('string'); | ||
describe('lagging', () => { | ||
it('should be instance of ProcessCustodian', () => { | ||
expect(porocessCustodian).to.be.instanceof(ProcessCustodian); | ||
expect(porocessCustodian.getEventLoopLag).to.be.function; | ||
expect(porocessCustodian.isOverloaded).to.be.function; | ||
expect(porocessCustodian.onceIAmNewMaster).to.be.function; | ||
expect(porocessCustodian.onceIAmSlave).to.be.function; | ||
expect(porocessCustodian.onceTick).to.be.function; | ||
expect(porocessCustodian.onceStop).to.be.function; | ||
expect(porocessCustodian.onIAmNewMaster).to.be.function; | ||
expect(porocessCustodian.onIAmSlave).to.be.function; | ||
expect(porocessCustodian.onTick).to.be.function; | ||
}); | ||
it('should return no lag ', () => { | ||
const noLag = porocessCustodian.getEventLoopLag(); | ||
expect(noLag).be.equal(0); | ||
}); | ||
it('should once called on tick should', () => { | ||
const exp = mock({once(){}}).expects('once'); | ||
expect(porocessCustodian.onceTick(() => { | ||
exp.once(); | ||
console.log('Once called onceTick', exp); | ||
})).to.be.an('undefined'); | ||
clock.tick(90); | ||
clock.tick(510); | ||
exp.verify(); | ||
clock.tick(780); | ||
}); | ||
it('should return a lag value after a little load', function() { | ||
const ts = Date.now(); | ||
console.log('Start heavy work'); | ||
clock.tick(200001); | ||
heavyWork(200); | ||
clock.tick(200001); | ||
heavyWork(200); | ||
clock.tick(200001); | ||
console.log('Stop heavy work'); | ||
const lag = porocessCustodian.getEventLoopLag(); | ||
expect(lag).be.above(1); | ||
}); | ||
}); | ||
function heavyWork(duration) { | ||
let a = 1; | ||
for (let i = 0; i < 1e7; i++) { | ||
a = a + 1; | ||
} | ||
porocessCustodian.onTick(()=> console.log('time')); | ||
for (let i = 0; i < 1e7; i++) { | ||
a = a + 1; | ||
} | ||
console.log('a', a); | ||
} |
@@ -7,3 +7,4 @@ | ||
I_AM_MASTER: 'IAmNewMaster', | ||
I_AM_SLAVE: 'IAmSlave' | ||
I_AM_SLAVE: 'IAmSlave', | ||
STOP: 'Stop' | ||
}; |
@@ -20,11 +20,19 @@ import {MASTER_KEY, EVENTS} from './constants'; | ||
class ProcessCustodian { | ||
_standardHighLevel = 99; | ||
_smoothingFactor = 1/3; | ||
_isMaster = false; | ||
_emitter = null; | ||
_timeout = null; | ||
_currentLag = 0; | ||
_expectedFiredTime = Infinity; | ||
_stop = null; | ||
constructor ({rawCollection, tickTimeInSeconds = 60, marginTimeForRenew = 10}) { | ||
constructor ({rawCollection, tickTimeInSeconds = 45, marginTimeForRenew = 15, standardHighLevel = 100}) { | ||
if (!rawCollection) { | ||
throw new Error('Missing Collection in constructor of ProcessCustodian'); | ||
} | ||
if (typeof Meteor === 'object' && Meteor.bindEnvironment) { | ||
this._bindEnvironment = Meteor.bindEnvironment; | ||
} | ||
this._standardHighLevel = standardHighLevel; | ||
this._emitter = new EventEmitter(); | ||
@@ -37,2 +45,5 @@ this._collection = rawCollection; | ||
Object.values(EVENTS).forEach(key => this[`on${key}`] = (fn => { | ||
if (this._bindEnvironment) { | ||
fn = this._bindEnvironment(fn); | ||
} | ||
this._emitter.on(key, fn); | ||
@@ -43,2 +54,5 @@ return () => this._emitter.removeListener(key, fn); | ||
Object.values(EVENTS).forEach(key => this[`once${key}`] = (fn => { | ||
if (this._bindEnvironment) { | ||
fn = this._bindEnvironment(fn); | ||
} | ||
this._emitter.once(key, fn); | ||
@@ -48,2 +62,44 @@ })); | ||
/** | ||
* fired on every checking of the event loop | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
onTick = noop; | ||
/** | ||
* fired every time when process will being a master | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
onIAmNewMaster = noop; | ||
/** | ||
* runs every time when process is going back to be a slave | ||
* @param {function} - event handler | ||
* @returns {function} - stop listening handler | ||
*/ | ||
onIAmSlave = noop; | ||
/** | ||
* Runs the passed function on next checking of the event loop | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
onceTick = noop; | ||
/** | ||
* fired once when process being be a master | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
onceIAmNewMaster = noop; | ||
/** | ||
* fired one time when process will lost master status | ||
* @param {function} - event handler | ||
* @returns {function} - stop handler | ||
*/ | ||
onceIAmSlave = noop; | ||
/** | ||
* Is this master process? | ||
* @returns {boolean} | ||
*/ | ||
isMaster = () => { | ||
@@ -53,2 +109,29 @@ return this._isMaster; | ||
/** | ||
* lag value from last sampling of event loop | ||
* @returns {number} | ||
*/ | ||
getEventLoopLag () { | ||
return Math.round(this._currentLag); | ||
} | ||
/** | ||
* Checks if the event loop lag is on too high level. | ||
* Using this information you can put away of computing new job, | ||
* or even stop the request processing early then process will freeze | ||
*/ | ||
isOverloaded () { | ||
return this._currentLag > this._standardHighLevel; | ||
} | ||
/** | ||
* Uniq fingerprint of process | ||
*/ | ||
getFingerprint () { | ||
return FINGERPRINT; | ||
} | ||
/** | ||
* stops sampling loop | ||
*/ | ||
stop = () => { | ||
@@ -60,2 +143,3 @@ if (this._stop) { | ||
this._stop = null; | ||
this._emitter.emit(EVENTS.STOP); | ||
} | ||
@@ -85,2 +169,3 @@ }; | ||
processStartAt: processStartAt, | ||
eventLoopLag: this.getEventLoopLag(), | ||
hostName, | ||
@@ -108,7 +193,7 @@ pid | ||
_id: MASTER_KEY, | ||
FINGERPRINT, | ||
fingerprint: FINGERPRINT, | ||
lastActivity: {$gte: renewDate} | ||
}, { | ||
$set: { | ||
FINGERPRINT, | ||
fingerprint: FINGERPRINT, | ||
lastActivity: new Date() | ||
@@ -133,2 +218,3 @@ } | ||
_id: MASTER_KEY, | ||
fingerprint: FINGERPRINT, | ||
lastActivity: new Date() | ||
@@ -152,3 +238,3 @@ } | ||
$set: { | ||
FINGERPRINT, | ||
fingerprint: FINGERPRINT, | ||
lastActivity: new Date() | ||
@@ -180,6 +266,11 @@ } | ||
const wasMaster = this._isMaster; | ||
let lag = Math.max(0, (Date.now() - this._expectedFiredTime)); | ||
// @see https://en.wikipedia.org/wiki/Exponential_smoothing | ||
// we weigh the current value against the previous value 3:1 to smooth bounds. | ||
this._currentLag = this._smoothingFactor * lag + (1 - this._smoothingFactor) * this._currentLag; | ||
try { | ||
if (wasMaster) { | ||
this._isMaster = await renewingMasterReservation.call(this, tickTimeInSeconds, marginTimeForRenew); | ||
} else { | ||
} else if (!this.isOverloaded()) { | ||
//don't want to try to be a master is with crossed standard lag limit | ||
this._isMaster = await tryBeMaster.call(this, tickTimeInSeconds, marginTimeForRenew, isInit); | ||
@@ -192,2 +283,3 @@ } | ||
finally { | ||
this._expectedFiredTime = Date.now() + tickTimeInSeconds * 1000; | ||
runActivityQueue.call(this, tickTimeInSeconds, marginTimeForRenew); | ||
@@ -207,2 +299,4 @@ this._emitter.emit(EVENTS.TICK); | ||
this._timeout = setTimeout(doTick, tickTimeInSeconds * 1000); | ||
// unref-ing the timeout, so to not be the only action that would hold the node-process open. | ||
this._timeout.unref && this._timeout.unref(); | ||
} | ||
@@ -212,2 +306,4 @@ return _stop.bind(this); | ||
function noop () { | ||
/* noop */ | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
40662
20
890
100
18