Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

lru-cache-for-clusters-as-promised

Package Overview
Dependencies
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

lru-cache-for-clusters-as-promised - npm Package Compare versions

Comparing version 1.6.1 to 1.7.0

lib/master-messages.js

7

CHANGELOG.md

@@ -0,1 +1,8 @@

1.7.0 / 2021-03-25
==================
* Refactoring for maintainability
* Update dependencies
* More reliable test coverage
1.6.1 / 2021-03-20

@@ -2,0 +9,0 @@ ==================

357

lib/master.js

@@ -6,5 +6,7 @@ const cluster = require('cluster');

const utils = require('./utils');
const config = require('../config');
const utils = require('./utils');
const masterMessages = require('./master-messages');
const debug = new Debug(`${config.source}-master`);

@@ -16,17 +18,2 @@ const messages = new Debug(`${config.source}-messages`);

function getLruCache(cache, options) {
if (caches[cache.namespace]) {
debug(`Loaded cache from shared namespace ${cache.namespace}`);
return caches[cache.namespace];
}
const lru = new LRUCache(options);
caches[cache.namespace] = lru;
if (options.prune) {
lru.job = startPruneCronJob(lru, options.prune, cache.namespace);
}
debug(`Created new LRU cache ${cache.namespace}`);
return lru;
}
/**

@@ -54,140 +41,120 @@ * Starts a cron job to prune stale objects from the cache

const processMessages = function () {
if (cluster.isMaster) {
// for each worker created...
cluster.on('fork', (worker) => {
// wait for the worker to send a message
worker.on('message', (request) => {
if (request.source !== config.source) return;
messages(`Master recieved message from worker ${worker.id}`, request);
// this code will only run on the master to set up handles for messages from the workers
if (cluster.isMaster) {
// for each worker created...
cluster.on('fork', (worker) => {
// wait for the worker to send a message
worker.on('message', (request) => {
if (request.source !== config.source) return;
messages(`Master recieved message from worker ${worker.id}`, request);
return masterMessages.getMessageHandler(request.func)(
caches,
request,
worker,
{
startPruneCronJob,
}
);
});
});
}
// try to load an existing lru-cache
const lru = caches[request.namespace];
function getLruCache(caches, cache, options, startPruneCronJob) {
if (caches[cache.namespace]) {
debug(`Loaded cache from shared namespace ${cache.namespace}`);
return caches[cache.namespace];
}
const params = request.arguments;
const lru = new LRUCache(options);
caches[cache.namespace] = lru;
if (options.prune && startPruneCronJob) {
lru.job = startPruneCronJob(lru, options.prune, cache.namespace);
}
debug(`Created new LRU cache ${cache.namespace}`);
return lru;
}
/**
* Sends the response back to the worker thread
* @param {Object} data The response from the cache
* @param {Object} request The request
* @param {Object} worker The worker sending the response
*/
function sendResponseToWorker(data) {
const response = data;
response.source = config.source;
response.id = request.id;
response.func = request.func;
messages(`Master sending response to worker ${worker.id}`, response);
worker.send(response);
}
const getCacheConfigValue = ({
caches,
namespace,
options,
func,
funcArgs,
}) => {
const lru = getLruCache(caches, namespace, options);
return new Promise((resolve) => {
if (funcArgs[0]) {
lru[func] = funcArgs[0];
}
return resolve(lru[func]);
});
};
const constructLruCache = function () {
let created = false;
let lru = caches[request.namespace];
const options = params[0];
// create a new lru-cache, give it a namespace, and save it locally
if (caches[request.namespace]) {
lru = caches[request.namespace];
// update property values as needed
['max', 'maxAge', 'stale'].forEach((prop) => {
if (options[prop] && options[prop] !== lru[prop]) {
lru[prop] = options[prop];
}
});
} else {
created = true;
lru = caches[request.namespace] = new LRUCache(...params);
// start a job to clean the cache
if (params[0].prune) {
lru.job = startPruneCronJob(
lru,
params[0].prune,
request.namespace
);
}
}
sendResponseToWorker({
value: {
namespace: request.namespace,
isnew: created,
max: lru.max,
maxAge: lru.maxAge,
stale: lru.stale,
},
});
};
const incrementOrDecrement = ({
caches,
namespace,
options,
func,
funcArgs,
startPruneCronJob,
}) => {
const lru = getLruCache(caches, namespace, options, startPruneCronJob);
return new Promise((resolve) => {
// get the current value default to 0
let value = lru.get(funcArgs[0]);
// maybe initialize and increment
value =
(typeof value === 'number' ? value : 0) +
(funcArgs[1] || 1) * (func === 'decr' ? -1 : 1);
// set the new value
lru.set(funcArgs[0], value);
// resolve the new value
return resolve(value);
});
};
const getCacheConfigValue = function () {
if (params[0]) {
lru[request.func] = params[0];
}
return sendResponseToWorker({
value: lru[request.func],
});
};
const getMultipleValues = (options) => handleMultipleValues('mGet', options);
const setMultipleValues = (options) => handleMultipleValues('mSet', options);
const deleteMultipleValues = (options) => handleMultipleValues('mDel', options);
const incrementOrDecrement = function () {
// get the current value
let value = lru.get(params[0]);
// maybe initialize and increment
value =
(typeof value === 'number' ? value : 0) +
(params[1] || 1) * (request.func === 'decr' ? -1 : 1);
// set the new value
lru.set(params[0], value);
// send the new value
return sendResponseToWorker({
value,
});
};
const handleMultipleValues = (
func,
{ namespace, options, funcArgs, startPruneCronJob }
) => {
const lru = getLruCache(caches, namespace, options, startPruneCronJob);
return new Promise((resolve) => {
return resolve(utils[func](lru, funcArgs));
});
};
const getMultipleValues = function () {
const mGetValues = utils.mGet(lru, params);
return sendResponseToWorker({ value: mGetValues });
};
const defaultLruFunction = ({
caches,
namespace,
options,
func,
funcArgs,
startPruneCronJob,
}) => {
const lru = getLruCache(caches, namespace, options, startPruneCronJob);
return new Promise((resolve) => {
return resolve(lru[func](...funcArgs));
});
};
const setMultipleValues = function () {
utils.mSet(lru, params);
return sendResponseToWorker({ value: true });
};
const deleteMultipleValues = function () {
utils.mDel(lru, params);
return sendResponseToWorker({ value: true });
};
const promiseHandlerFunctions = {
mGet: getMultipleValues,
mSet: setMultipleValues,
mDel: deleteMultipleValues,
decr: incrementOrDecrement,
incr: incrementOrDecrement,
max: getCacheConfigValue,
maxAge: getCacheConfigValue,
stale: getCacheConfigValue,
itemCount: getCacheConfigValue,
length: getCacheConfigValue,
};
switch (request.func) {
// constructor request
case '()': {
return constructLruCache();
}
case 'max':
case 'maxAge':
case 'stale':
case 'length':
case 'itemCount': {
return getCacheConfigValue();
}
case 'decr':
case 'incr': {
return incrementOrDecrement();
}
case 'mGet': {
return getMultipleValues();
}
case 'mSet': {
return setMultipleValues();
}
case 'mDel': {
return deleteMultipleValues();
}
// return the function value
default: {
return sendResponseToWorker({
value: lru[request.func](...params),
});
}
}
});
});
}
const getPromiseHandler = (func) => {
const handler = promiseHandlerFunctions[func];
return handler ? handler : defaultLruFunction;
};

@@ -198,85 +165,21 @@

// lru-cache on the master thread if this is a worker
const getPromisified = (namespace, options) => {
const lru = getLruCache(namespace, options);
return (...args) => {
// first argument is the function to run
const func = args[0];
// the rest of the args are the function arguments of N length
const funcArgs = Array.prototype.slice.call(args, 1, args.length);
// acting on the local lru-cache
messages(namespace, args);
let promise;
switch (func) {
case 'max':
case 'maxAge':
case 'stale': {
promise = new Promise((resolve) => {
if (funcArgs[0]) {
lru[func] = funcArgs[0];
}
return resolve(lru[func]);
});
break;
}
case 'decr':
case 'incr': {
promise = new Promise((resolve) => {
// get the current value default to 0
let value = lru.get(funcArgs[0]);
// maybe initialize and increment
value =
(typeof value === 'number' ? value : 0) +
(funcArgs[1] || 1) * (func === 'decr' ? -1 : 1);
// set the new value
lru.set(funcArgs[0], value);
// resolve the new value
return resolve(value);
});
break;
}
case 'mGet': {
promise = new Promise((resolve) => {
const mGetValues = utils.mGet(lru, funcArgs);
return resolve(mGetValues);
});
break;
}
case 'mSet': {
promise = new Promise((resolve) => {
utils.mSet(lru, funcArgs);
return resolve(true);
});
break;
}
case 'mDel': {
promise = new Promise((resolve) => {
utils.mDel(lru, funcArgs);
return resolve(true);
});
break;
}
case 'itemCount':
case 'length': {
// return the property value
promise = new Promise((resolve) => {
return resolve(lru[func]);
});
break;
}
default: {
// just call the function on the lru-cache
promise = new Promise((resolve) => {
return resolve(lru[func](...funcArgs));
});
break;
}
}
return promise;
};
};
module.exports = {
processMessages,
getLruCache,
getPromisified,
getPromisified: (namespace, options) => {
return (...args) => {
// acting on the local lru-cache
messages(namespace, args);
// first argument is the function to run
const func = args[0];
// the rest of the args are the function arguments of N length
const funcArgs = Array.prototype.slice.call(args, 1, args.length);
return getPromiseHandler(func)({
caches,
namespace,
options,
func,
funcArgs,
startPruneCronJob,
});
};
},
};

@@ -12,16 +12,46 @@ const cluster = require('cluster');

const processMessages = function () {
// run on each worker thread
if (cluster.isWorker) {
process.on('message', (response) => {
messages(`Worker ${cluster.worker.id} recieved message`, response);
// look up the callback based on the response ID, delete it, then call it
if (response.source !== config.source || !callbacks[response.id]) return;
const callback = callbacks[response.id];
delete callbacks[response.id];
callback(response);
});
}
};
// run on each worker thread
if (cluster.isWorker) {
process.on('message', (response) => {
messages(`Worker ${cluster.worker.id} recieved message`, response);
// look up the callback based on the response ID, delete it, then call it
if (response.source !== config.source || !callbacks[response.id]) return;
const callback = callbacks[response.id];
delete callbacks[response.id];
callback(response);
});
}
const requestToMaster = (cache, func, funcArgs) =>
new Promise((resolve, reject) => {
// create the request to the master
const request = {
source: config.source,
namespace: cache.namespace,
id: uuid.v4(),
func,
arguments: funcArgs,
};
// if we don't get a response in 100ms, return undefined
let failsafeTimeout = setTimeout(
() => {
failsafeTimeout = null;
return cache.failsafe === 'reject'
? reject(new Error('Timed out in isFailed()'))
: resolve();
},
func === '()' ? 5000 : cache.timeout
);
// set the callback for this id to resolve the promise
callbacks[request.id] = (result) => {
if (failsafeTimeout) {
clearTimeout(failsafeTimeout);
return resolve(result.value);
}
return false;
};
// send the request to the master process
process.send(request);
});
const getPromisified = (cache, options) => {

@@ -37,33 +67,3 @@ // return a promise that resolves to the result of the method on

// cluster.isWorker
return new Promise((resolve, reject) => {
// create the request to the master
const request = {
source: config.source,
namespace: cache.namespace,
id: uuid.v4(),
func,
arguments: funcArgs,
};
// if we don't get a response in 100ms, return undefined
let failsafeTimeout = setTimeout(
() => {
failsafeTimeout = null;
if (cache.failsafe === 'reject') {
return reject(new Error('Timed out in isFailed()'));
}
return resolve();
},
func === '()' ? 5000 : cache.timeout
);
// set the callback for this id to resolve the promise
callbacks[request.id] = (result) => {
if (failsafeTimeout) {
clearTimeout(failsafeTimeout);
return resolve(result.value);
}
return false;
};
// send the request to the master process
process.send(request);
});
return requestToMaster(cache, func, funcArgs);
};

@@ -84,4 +84,3 @@

module.exports = {
processMessages,
getPromisified,
};

@@ -15,8 +15,2 @@ /**

// setup the the master to handle messages for each worker
master.processMessages();
// setup each worker to handle messages from the master
worker.processMessages();
/**

@@ -23,0 +17,0 @@ * LRUCacheForClustersAsPromised roughly approximates the functionality of LRUCache

{
"name": "lru-cache-for-clusters-as-promised",
"version": "1.6.1",
"version": "1.7.0",
"types": "index.d.ts",

@@ -5,0 +5,0 @@ "description": "LRU Cache that is safe for clusters",

@@ -10,7 +10,12 @@ const request = require('supertest');

// run before the tests start
before((done) => {
before(function (done) {
// This will call done with the cluster has forked and the worker is listening
this.timeout(5000);
master = require('./lib/cluster-master')(done);
});
after((done) => {
master.shutdown(done);
});
afterEach((done) => {

@@ -17,0 +22,0 @@ request(`http://${config.server.host}:${config.server.port}`)

const cluster = require('cluster');
const os = require('os');
const path = require('path');
const Debug = require('debug');
const config = require('../../config');
const LRUCache = require('../../');
const debug = new Debug(`${config.source}-test-cluster-master`);
LRUCache.init();

@@ -24,2 +27,3 @@

'none',
// '--include-all-sources',
// output files will have the workers PID in the filename

@@ -40,14 +44,4 @@ '--include-pid',

// for each worker created...
cluster.on('fork', (worker) => {
// wait for the worker to send a message
worker.on('message', (request) => {
if (request === 'hi') {
worker.send('hello');
}
});
});
// create one process per CPU core
const workers = os.cpus().length;
// start one worker to handle the threads
const workers = 1;
for (let i = 0; i < workers; i += 1) {

@@ -71,2 +65,12 @@ cluster.fork();

});
// wait for the worker to send a message
worker.on('message', (request) => {
if (request === 'hi') {
worker.send('hello');
}
});
// wait for the worker to send a message
worker.on('error', (error) => {
debug(error.message);
});
});

@@ -98,3 +102,8 @@ return {

},
shutdown: (done) => {
cluster.disconnect(() => {
done();
});
},
};
};

@@ -11,2 +11,4 @@ const config = require('./test-config');

require('../../lib/worker');
// this will be the SAME cache no matter which module calls it.

@@ -16,3 +18,2 @@ const defaultCache = new LRUCache({

maxAge: 100000,
stale: true,
});

@@ -36,3 +37,3 @@ defaultCache.keys();

if (err) {
return res.send(err);
return res.send(err.message);
}

@@ -39,0 +40,0 @@ return res.send(true);

@@ -10,5 +10,5 @@ module.exports = {

server: {
port: process.env.PORT || 6666,
port: process.env.PORT || 4242,
host: '127.0.0.1',
},
};

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc