Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

process-custodian

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

process-custodian - npm Package Compare versions

Comparing version 0.2.2 to 0.3.0

src/__tests__/mock.js

3

dist/lib/constants.js

@@ -11,3 +11,4 @@ 'use strict';

I_AM_MASTER: 'IAmNewMaster',
I_AM_SLAVE: 'IAmSlave'
I_AM_SLAVE: 'IAmSlave',
STOP: 'Stop'
};

@@ -19,2 +19,6 @@ 'use strict';

var _typeof2 = require('babel-runtime/helpers/typeof');
var _typeof3 = _interopRequireDefault(_typeof2);
var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');

@@ -24,2 +28,6 @@

var _createClass2 = require('babel-runtime/helpers/createClass');
var _createClass3 = _interopRequireDefault(_createClass2);
var ensureIndexExist = function () {

@@ -73,2 +81,3 @@ var _ref2 = (0, _asyncToGenerator3.default)( /*#__PURE__*/_regenerator2.default.mark(function _callee(tickTimeInSeconds) {

processStartAt: processStartAt,
eventLoopLag: this.getEventLoopLag(),
hostName: hostName,

@@ -123,7 +132,7 @@ pid: pid

_id: _constants.MASTER_KEY,
FINGERPRINT: _fingerprint.FINGERPRINT,
fingerprint: _fingerprint.FINGERPRINT,
lastActivity: { $gte: renewDate }
}, {
$set: {
FINGERPRINT: _fingerprint.FINGERPRINT,
fingerprint: _fingerprint.FINGERPRINT,
lastActivity: new Date()

@@ -174,2 +183,3 @@ }

_id: _constants.MASTER_KEY,
fingerprint: _fingerprint.FINGERPRINT,
lastActivity: new Date()

@@ -204,3 +214,3 @@ }

$set: {
FINGERPRINT: _fingerprint.FINGERPRINT,
fingerprint: _fingerprint.FINGERPRINT,
lastActivity: new Date()

@@ -262,54 +272,164 @@ }

var ProcessCustodian = function ProcessCustodian(_ref) {
var _this = this;
var ProcessCustodian = function () {
function ProcessCustodian(_ref) {
var _this = this;
var rawCollection = _ref.rawCollection,
_ref$tickTimeInSecond = _ref.tickTimeInSeconds,
tickTimeInSeconds = _ref$tickTimeInSecond === undefined ? 60 : _ref$tickTimeInSecond,
_ref$marginTimeForRen = _ref.marginTimeForRenew,
marginTimeForRenew = _ref$marginTimeForRen === undefined ? 10 : _ref$marginTimeForRen;
(0, _classCallCheck3.default)(this, ProcessCustodian);
this._isMaster = false;
this._emitter = null;
this._timeout = null;
this._stop = null;
var rawCollection = _ref.rawCollection,
_ref$tickTimeInSecond = _ref.tickTimeInSeconds,
tickTimeInSeconds = _ref$tickTimeInSecond === undefined ? 45 : _ref$tickTimeInSecond,
_ref$marginTimeForRen = _ref.marginTimeForRenew,
marginTimeForRenew = _ref$marginTimeForRen === undefined ? 15 : _ref$marginTimeForRen,
_ref$standardHighLeve = _ref.standardHighLevel,
standardHighLevel = _ref$standardHighLeve === undefined ? 100 : _ref$standardHighLeve;
(0, _classCallCheck3.default)(this, ProcessCustodian);
this._standardHighLevel = 99;
this._smoothingFactor = 1 / 3;
this._isMaster = false;
this._emitter = null;
this._timeout = null;
this._currentLag = 0;
this._expectedFiredTime = Infinity;
this._stop = null;
this.onTick = noop;
this.onIAmNewMaster = noop;
this.onIAmSlave = noop;
this.onceTick = noop;
this.onceIAmNewMaster = noop;
this.onceIAmSlave = noop;
this.isMaster = function () {
return _this._isMaster;
};
this.isMaster = function () {
return _this._isMaster;
};
this.stop = function () {
if (_this._stop) {
_this._stop();
_this._collection.deleteOne({ _id: _fingerprint.FINGERPRINT });
_this._collection.deleteOne({ id: _constants.MASTER_KEY, FINGERPRINT: _fingerprint.FINGERPRINT });
_this._stop = null;
this.stop = function () {
if (_this._stop) {
_this._stop();
_this._collection.deleteOne({ _id: _fingerprint.FINGERPRINT });
_this._collection.deleteOne({ id: _constants.MASTER_KEY, FINGERPRINT: _fingerprint.FINGERPRINT });
_this._stop = null;
_this._emitter.emit(_constants.EVENTS.STOP);
}
};
if (!rawCollection) {
throw new Error('Missing Collection in constructor of ProcessCustodian');
}
};
if ((typeof Meteor === 'undefined' ? 'undefined' : (0, _typeof3.default)(Meteor)) === 'object' && Meteor.bindEnvironment) {
this._bindEnvironment = Meteor.bindEnvironment;
}
this._standardHighLevel = standardHighLevel;
this._emitter = new _events2.default();
this._collection = rawCollection;
ensureIndexExist.call(this, tickTimeInSeconds);
this._stop = runActivityQueue.call(this, tickTimeInSeconds, marginTimeForRenew, true);
if (!rawCollection) {
throw new Error('Missing Collection in constructor of ProcessCustodian');
// preparing methods onTick, onIAmNewMaster, onIAmSlave
(0, _values2.default)(_constants.EVENTS).forEach(function (key) {
return _this['on' + key] = function (fn) {
if (_this._bindEnvironment) {
fn = _this._bindEnvironment(fn);
}
_this._emitter.on(key, fn);
return function () {
return _this._emitter.removeListener(key, fn);
};
};
});
// once
(0, _values2.default)(_constants.EVENTS).forEach(function (key) {
return _this['once' + key] = function (fn) {
if (_this._bindEnvironment) {
fn = _this._bindEnvironment(fn);
}
_this._emitter.once(key, fn);
};
});
}
this._emitter = new _events2.default();
this._collection = rawCollection;
ensureIndexExist.call(this, tickTimeInSeconds);
this._stop = runActivityQueue.call(this, tickTimeInSeconds, marginTimeForRenew, true);
// preparing methods onTick, onIAmNewMaster, onIAmSlave
(0, _values2.default)(_constants.EVENTS).forEach(function (key) {
return _this['on' + key] = function (fn) {
_this._emitter.on(key, fn);
return function () {
return _this._emitter.removeListener(key, fn);
};
};
});
// once
(0, _values2.default)(_constants.EVENTS).forEach(function (key) {
return _this['once' + key] = function (fn) {
_this._emitter.once(key, fn);
};
});
};
/**
* fired on every checking of the event loop
* @param {function} - event handler
* @returns {function} - stop handler
*/
/**
* fired every time when process will being a master
* @param {function} - event handler
* @returns {function} - stop handler
*/
/**
* runs every time when process is going back to be a slave
* @param {function} - event handler
* @returns {function} - stop listening handler
*/
/**
* Runs the passed function on next checking of the event loop
* @param {function} - event handler
* @returns {function} - stop handler
*/
/**
* fired once when process being be a master
* @param {function} - event handler
* @returns {function} - stop handler
*/
/**
* fired one time when process will lost master status
* @param {function} - event handler
* @returns {function} - stop handler
*/
/**
* Is this master process?
* @returns {boolean}
*/
(0, _createClass3.default)(ProcessCustodian, [{
key: 'getEventLoopLag',
/**
* lag value from last sampling of event loop
* @returns {number}
*/
value: function getEventLoopLag() {
return Math.round(this._currentLag);
}
/**
* Checks if the event loop lag is on too high level.
* Using this information you can put away of computing new job,
* or even stop the request processing early then process will freeze
*/
}, {
key: 'isOverloaded',
value: function isOverloaded() {
return this._currentLag > this._standardHighLevel;
}
/**
* Uniq fingerprint of process
*/
}, {
key: 'getFingerprint',
value: function getFingerprint() {
return _fingerprint.FINGERPRINT;
}
/**
* stops sampling loop
*/
}]);
return ProcessCustodian;
}();
exports.default = ProcessCustodian;

@@ -336,3 +456,3 @@

var _ref6 = (0, _asyncToGenerator3.default)( /*#__PURE__*/_regenerator2.default.mark(function _callee5() {
var wasMaster;
var wasMaster, lag;
return _regenerator2.default.wrap(function _callee5$(_context5) {

@@ -343,41 +463,52 @@ while (1) {

wasMaster = _this2._isMaster;
_context5.prev = 1;
lag = Math.max(0, Date.now() - _this2._expectedFiredTime);
// @see https://en.wikipedia.org/wiki/Exponential_smoothing
// we weigh the current value against the previous value 3:1 to smooth bounds.
_this2._currentLag = _this2._smoothingFactor * lag + (1 - _this2._smoothingFactor) * _this2._currentLag;
_context5.prev = 3;
if (!wasMaster) {
_context5.next = 8;
_context5.next = 10;
break;
}
_context5.next = 5;
_context5.next = 7;
return renewingMasterReservation.call(_this2, tickTimeInSeconds, marginTimeForRenew);
case 5:
case 7:
_this2._isMaster = _context5.sent;
_context5.next = 11;
_context5.next = 14;
break;
case 8:
_context5.next = 10;
case 10:
if (_this2.isOverloaded()) {
_context5.next = 14;
break;
}
_context5.next = 13;
return tryBeMaster.call(_this2, tickTimeInSeconds, marginTimeForRenew, isInit);
case 10:
case 13:
_this2._isMaster = _context5.sent;
case 11:
_context5.next = 13;
case 14:
_context5.next = 16;
return oneHeartbeat.call(_this2, tickTimeInSeconds);
case 13:
_context5.next = 18;
case 16:
_context5.next = 21;
break;
case 15:
_context5.prev = 15;
_context5.t0 = _context5['catch'](1);
case 18:
_context5.prev = 18;
_context5.t0 = _context5['catch'](3);
console.error('ActivityQueue:', _context5.t0);
case 18:
_context5.prev = 18;
case 21:
_context5.prev = 21;
_this2._expectedFiredTime = Date.now() + tickTimeInSeconds * 1000;
runActivityQueue.call(_this2, tickTimeInSeconds, marginTimeForRenew);

@@ -391,5 +522,5 @@ _this2._emitter.emit(_constants.EVENTS.TICK);

}
return _context5.finish(18);
return _context5.finish(21);
case 24:
case 28:
case 'end':

@@ -399,3 +530,3 @@ return _context5.stop();

}
}, _callee5, _this2, [[1, 15, 18, 24]]);
}, _callee5, _this2, [[3, 18, 21, 28]]);
}));

@@ -411,4 +542,10 @@

this._timeout = setTimeout(doTick, tickTimeInSeconds * 1000);
// unref-ing the timeout, so to not be the only action that would hold the node-process open.
this._timeout.unref && this._timeout.unref();
}
return _stop.bind(this);
}
function noop() {
/* noop */
}
{
"name": "process-custodian",
"version": "0.2.2",
"version": "0.3.0",
"description": "This package helps with organizing of tasks between few instances of same app, It can identify processes and track them activity.",

@@ -42,3 +42,4 @@ "main": "index.js",

"nodemon": "1.7.x",
"pre-commit": "^1.1.3"
"pre-commit": "^1.1.3",
"sinon": "^3.2.1"
},

@@ -45,0 +46,0 @@ "dependencies": {

@@ -49,2 +49,6 @@ # Process Custodian

});
const lag = handle.getEventLoopLag();
const stopSomething = handle.isOverloaded();

@@ -51,0 +55,0 @@ ```

@@ -0,8 +1,67 @@

import {MASTER_KEY, EVENTS} from '../lib/constants';
import ProcessCustodian from '../lib/ProcessCustodian';
import {mockCollection} from './mock';
import {mock, useFakeTimers} from 'sinon';
import {expect} from 'chai';
const {describe, it} = global;
const tickTimeInSeconds = 10;
const clock = useFakeTimers();
const porocessCustodian = new ProcessCustodian({
rawCollection: mockCollection(),
tickTimeInSeconds,
marginTimeForRenew: 2
});
describe('test 1', () => {
it('case 1', () => {
expect('ala').to.be.typeof('string');
describe('lagging', () => {
it('should be instance of ProcessCustodian', () => {
expect(porocessCustodian).to.be.instanceof(ProcessCustodian);
expect(porocessCustodian.getEventLoopLag).to.be.function;
expect(porocessCustodian.isOverloaded).to.be.function;
expect(porocessCustodian.onceIAmNewMaster).to.be.function;
expect(porocessCustodian.onceIAmSlave).to.be.function;
expect(porocessCustodian.onceTick).to.be.function;
expect(porocessCustodian.onceStop).to.be.function;
expect(porocessCustodian.onIAmNewMaster).to.be.function;
expect(porocessCustodian.onIAmSlave).to.be.function;
expect(porocessCustodian.onTick).to.be.function;
});
it('should return no lag ', () => {
const noLag = porocessCustodian.getEventLoopLag();
expect(noLag).be.equal(0);
});
it('should once called on tick should', () => {
const exp = mock({once(){}}).expects('once');
expect(porocessCustodian.onceTick(() => {
exp.once();
console.log('Once called onceTick', exp);
})).to.be.an('undefined');
clock.tick(90);
clock.tick(510);
exp.verify();
clock.tick(780);
});
it('should return a lag value after a little load', function() {
const ts = Date.now();
console.log('Start heavy work');
clock.tick(200001);
heavyWork(200);
clock.tick(200001);
heavyWork(200);
clock.tick(200001);
console.log('Stop heavy work');
const lag = porocessCustodian.getEventLoopLag();
expect(lag).be.above(1);
});
});
function heavyWork(duration) {
let a = 1;
for (let i = 0; i < 1e7; i++) {
a = a + 1;
}
porocessCustodian.onTick(()=> console.log('time'));
for (let i = 0; i < 1e7; i++) {
a = a + 1;
}
console.log('a', a);
}

@@ -7,3 +7,4 @@

I_AM_MASTER: 'IAmNewMaster',
I_AM_SLAVE: 'IAmSlave'
I_AM_SLAVE: 'IAmSlave',
STOP: 'Stop'
};

@@ -20,11 +20,19 @@ import {MASTER_KEY, EVENTS} from './constants';

class ProcessCustodian {
_standardHighLevel = 99;
_smoothingFactor = 1/3;
_isMaster = false;
_emitter = null;
_timeout = null;
_currentLag = 0;
_expectedFiredTime = Infinity;
_stop = null;
constructor ({rawCollection, tickTimeInSeconds = 60, marginTimeForRenew = 10}) {
constructor ({rawCollection, tickTimeInSeconds = 45, marginTimeForRenew = 15, standardHighLevel = 100}) {
if (!rawCollection) {
throw new Error('Missing Collection in constructor of ProcessCustodian');
}
if (typeof Meteor === 'object' && Meteor.bindEnvironment) {
this._bindEnvironment = Meteor.bindEnvironment;
}
this._standardHighLevel = standardHighLevel;
this._emitter = new EventEmitter();

@@ -37,2 +45,5 @@ this._collection = rawCollection;

Object.values(EVENTS).forEach(key => this[`on${key}`] = (fn => {
if (this._bindEnvironment) {
fn = this._bindEnvironment(fn);
}
this._emitter.on(key, fn);

@@ -43,2 +54,5 @@ return () => this._emitter.removeListener(key, fn);

Object.values(EVENTS).forEach(key => this[`once${key}`] = (fn => {
if (this._bindEnvironment) {
fn = this._bindEnvironment(fn);
}
this._emitter.once(key, fn);

@@ -48,2 +62,44 @@ }));

/**
* fired on every checking of the event loop
* @param {function} - event handler
* @returns {function} - stop handler
*/
onTick = noop;
/**
* fired every time when process will being a master
* @param {function} - event handler
* @returns {function} - stop handler
*/
onIAmNewMaster = noop;
/**
* runs every time when process is going back to be a slave
* @param {function} - event handler
* @returns {function} - stop listening handler
*/
onIAmSlave = noop;
/**
* Runs the passed function on next checking of the event loop
* @param {function} - event handler
* @returns {function} - stop handler
*/
onceTick = noop;
/**
* fired once when process being be a master
* @param {function} - event handler
* @returns {function} - stop handler
*/
onceIAmNewMaster = noop;
/**
* fired one time when process will lost master status
* @param {function} - event handler
* @returns {function} - stop handler
*/
onceIAmSlave = noop;
/**
* Is this master process?
* @returns {boolean}
*/
isMaster = () => {

@@ -53,2 +109,29 @@ return this._isMaster;

/**
* lag value from last sampling of event loop
* @returns {number}
*/
getEventLoopLag () {
return Math.round(this._currentLag);
}
/**
* Checks if the event loop lag is on too high level.
* Using this information you can put away of computing new job,
* or even stop the request processing early then process will freeze
*/
isOverloaded () {
return this._currentLag > this._standardHighLevel;
}
/**
* Uniq fingerprint of process
*/
getFingerprint () {
return FINGERPRINT;
}
/**
* stops sampling loop
*/
stop = () => {

@@ -60,2 +143,3 @@ if (this._stop) {

this._stop = null;
this._emitter.emit(EVENTS.STOP);
}

@@ -85,2 +169,3 @@ };

processStartAt: processStartAt,
eventLoopLag: this.getEventLoopLag(),
hostName,

@@ -108,7 +193,7 @@ pid

_id: MASTER_KEY,
FINGERPRINT,
fingerprint: FINGERPRINT,
lastActivity: {$gte: renewDate}
}, {
$set: {
FINGERPRINT,
fingerprint: FINGERPRINT,
lastActivity: new Date()

@@ -133,2 +218,3 @@ }

_id: MASTER_KEY,
fingerprint: FINGERPRINT,
lastActivity: new Date()

@@ -152,3 +238,3 @@ }

$set: {
FINGERPRINT,
fingerprint: FINGERPRINT,
lastActivity: new Date()

@@ -180,6 +266,11 @@ }

const wasMaster = this._isMaster;
let lag = Math.max(0, (Date.now() - this._expectedFiredTime));
// @see https://en.wikipedia.org/wiki/Exponential_smoothing
// we weigh the current value against the previous value 3:1 to smooth bounds.
this._currentLag = this._smoothingFactor * lag + (1 - this._smoothingFactor) * this._currentLag;
try {
if (wasMaster) {
this._isMaster = await renewingMasterReservation.call(this, tickTimeInSeconds, marginTimeForRenew);
} else {
} else if (!this.isOverloaded()) {
//don't want to try to be a master is with crossed standard lag limit
this._isMaster = await tryBeMaster.call(this, tickTimeInSeconds, marginTimeForRenew, isInit);

@@ -192,2 +283,3 @@ }

finally {
this._expectedFiredTime = Date.now() + tickTimeInSeconds * 1000;
runActivityQueue.call(this, tickTimeInSeconds, marginTimeForRenew);

@@ -207,2 +299,4 @@ this._emitter.emit(EVENTS.TICK);

this._timeout = setTimeout(doTick, tickTimeInSeconds * 1000);
// unref-ing the timeout, so to not be the only action that would hold the node-process open.
this._timeout.unref && this._timeout.unref();
}

@@ -212,2 +306,4 @@ return _stop.bind(this);

function noop () {
/* noop */
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc