lru-cache-for-clusters-as-promised
Advanced tools
Comparing version 1.6.1 to 1.7.0
@@ -0,1 +1,8 @@ | ||
1.7.0 / 2021-03-25 | ||
================== | ||
* Refactoring for maintainability | ||
* Update dependencies | ||
* More reliable test coverage | ||
1.6.1 / 2021-03-20 | ||
@@ -2,0 +9,0 @@ ================== |
@@ -6,5 +6,7 @@ const cluster = require('cluster'); | ||
const utils = require('./utils'); | ||
const config = require('../config'); | ||
const utils = require('./utils'); | ||
const masterMessages = require('./master-messages'); | ||
const debug = new Debug(`${config.source}-master`); | ||
@@ -16,17 +18,2 @@ const messages = new Debug(`${config.source}-messages`); | ||
function getLruCache(cache, options) { | ||
if (caches[cache.namespace]) { | ||
debug(`Loaded cache from shared namespace ${cache.namespace}`); | ||
return caches[cache.namespace]; | ||
} | ||
const lru = new LRUCache(options); | ||
caches[cache.namespace] = lru; | ||
if (options.prune) { | ||
lru.job = startPruneCronJob(lru, options.prune, cache.namespace); | ||
} | ||
debug(`Created new LRU cache ${cache.namespace}`); | ||
return lru; | ||
} | ||
/** | ||
@@ -54,140 +41,120 @@ * Starts a cron job to prune stale objects from the cache | ||
const processMessages = function () { | ||
if (cluster.isMaster) { | ||
// for each worker created... | ||
cluster.on('fork', (worker) => { | ||
// wait for the worker to send a message | ||
worker.on('message', (request) => { | ||
if (request.source !== config.source) return; | ||
messages(`Master recieved message from worker ${worker.id}`, request); | ||
// this code will only run on the master to set up handles for messages from the workers | ||
if (cluster.isMaster) { | ||
// for each worker created... | ||
cluster.on('fork', (worker) => { | ||
// wait for the worker to send a message | ||
worker.on('message', (request) => { | ||
if (request.source !== config.source) return; | ||
messages(`Master recieved message from worker ${worker.id}`, request); | ||
return masterMessages.getMessageHandler(request.func)( | ||
caches, | ||
request, | ||
worker, | ||
{ | ||
startPruneCronJob, | ||
} | ||
); | ||
}); | ||
}); | ||
} | ||
// try to load an existing lru-cache | ||
const lru = caches[request.namespace]; | ||
function getLruCache(caches, cache, options, startPruneCronJob) { | ||
if (caches[cache.namespace]) { | ||
debug(`Loaded cache from shared namespace ${cache.namespace}`); | ||
return caches[cache.namespace]; | ||
} | ||
const params = request.arguments; | ||
const lru = new LRUCache(options); | ||
caches[cache.namespace] = lru; | ||
if (options.prune && startPruneCronJob) { | ||
lru.job = startPruneCronJob(lru, options.prune, cache.namespace); | ||
} | ||
debug(`Created new LRU cache ${cache.namespace}`); | ||
return lru; | ||
} | ||
/** | ||
* Sends the response back to the worker thread | ||
* @param {Object} data The response from the cache | ||
* @param {Object} request The request | ||
* @param {Object} worker The worker sending the response | ||
*/ | ||
function sendResponseToWorker(data) { | ||
const response = data; | ||
response.source = config.source; | ||
response.id = request.id; | ||
response.func = request.func; | ||
messages(`Master sending response to worker ${worker.id}`, response); | ||
worker.send(response); | ||
} | ||
const getCacheConfigValue = ({ | ||
caches, | ||
namespace, | ||
options, | ||
func, | ||
funcArgs, | ||
}) => { | ||
const lru = getLruCache(caches, namespace, options); | ||
return new Promise((resolve) => { | ||
if (funcArgs[0]) { | ||
lru[func] = funcArgs[0]; | ||
} | ||
return resolve(lru[func]); | ||
}); | ||
}; | ||
const constructLruCache = function () { | ||
let created = false; | ||
let lru = caches[request.namespace]; | ||
const options = params[0]; | ||
// create a new lru-cache, give it a namespace, and save it locally | ||
if (caches[request.namespace]) { | ||
lru = caches[request.namespace]; | ||
// update property values as needed | ||
['max', 'maxAge', 'stale'].forEach((prop) => { | ||
if (options[prop] && options[prop] !== lru[prop]) { | ||
lru[prop] = options[prop]; | ||
} | ||
}); | ||
} else { | ||
created = true; | ||
lru = caches[request.namespace] = new LRUCache(...params); | ||
// start a job to clean the cache | ||
if (params[0].prune) { | ||
lru.job = startPruneCronJob( | ||
lru, | ||
params[0].prune, | ||
request.namespace | ||
); | ||
} | ||
} | ||
sendResponseToWorker({ | ||
value: { | ||
namespace: request.namespace, | ||
isnew: created, | ||
max: lru.max, | ||
maxAge: lru.maxAge, | ||
stale: lru.stale, | ||
}, | ||
}); | ||
}; | ||
const incrementOrDecrement = ({ | ||
caches, | ||
namespace, | ||
options, | ||
func, | ||
funcArgs, | ||
startPruneCronJob, | ||
}) => { | ||
const lru = getLruCache(caches, namespace, options, startPruneCronJob); | ||
return new Promise((resolve) => { | ||
// get the current value default to 0 | ||
let value = lru.get(funcArgs[0]); | ||
// maybe initialize and increment | ||
value = | ||
(typeof value === 'number' ? value : 0) + | ||
(funcArgs[1] || 1) * (func === 'decr' ? -1 : 1); | ||
// set the new value | ||
lru.set(funcArgs[0], value); | ||
// resolve the new value | ||
return resolve(value); | ||
}); | ||
}; | ||
const getCacheConfigValue = function () { | ||
if (params[0]) { | ||
lru[request.func] = params[0]; | ||
} | ||
return sendResponseToWorker({ | ||
value: lru[request.func], | ||
}); | ||
}; | ||
const getMultipleValues = (options) => handleMultipleValues('mGet', options); | ||
const setMultipleValues = (options) => handleMultipleValues('mSet', options); | ||
const deleteMultipleValues = (options) => handleMultipleValues('mDel', options); | ||
const incrementOrDecrement = function () { | ||
// get the current value | ||
let value = lru.get(params[0]); | ||
// maybe initialize and increment | ||
value = | ||
(typeof value === 'number' ? value : 0) + | ||
(params[1] || 1) * (request.func === 'decr' ? -1 : 1); | ||
// set the new value | ||
lru.set(params[0], value); | ||
// send the new value | ||
return sendResponseToWorker({ | ||
value, | ||
}); | ||
}; | ||
const handleMultipleValues = ( | ||
func, | ||
{ namespace, options, funcArgs, startPruneCronJob } | ||
) => { | ||
const lru = getLruCache(caches, namespace, options, startPruneCronJob); | ||
return new Promise((resolve) => { | ||
return resolve(utils[func](lru, funcArgs)); | ||
}); | ||
}; | ||
const getMultipleValues = function () { | ||
const mGetValues = utils.mGet(lru, params); | ||
return sendResponseToWorker({ value: mGetValues }); | ||
}; | ||
const defaultLruFunction = ({ | ||
caches, | ||
namespace, | ||
options, | ||
func, | ||
funcArgs, | ||
startPruneCronJob, | ||
}) => { | ||
const lru = getLruCache(caches, namespace, options, startPruneCronJob); | ||
return new Promise((resolve) => { | ||
return resolve(lru[func](...funcArgs)); | ||
}); | ||
}; | ||
const setMultipleValues = function () { | ||
utils.mSet(lru, params); | ||
return sendResponseToWorker({ value: true }); | ||
}; | ||
const deleteMultipleValues = function () { | ||
utils.mDel(lru, params); | ||
return sendResponseToWorker({ value: true }); | ||
}; | ||
const promiseHandlerFunctions = { | ||
mGet: getMultipleValues, | ||
mSet: setMultipleValues, | ||
mDel: deleteMultipleValues, | ||
decr: incrementOrDecrement, | ||
incr: incrementOrDecrement, | ||
max: getCacheConfigValue, | ||
maxAge: getCacheConfigValue, | ||
stale: getCacheConfigValue, | ||
itemCount: getCacheConfigValue, | ||
length: getCacheConfigValue, | ||
}; | ||
switch (request.func) { | ||
// constructor request | ||
case '()': { | ||
return constructLruCache(); | ||
} | ||
case 'max': | ||
case 'maxAge': | ||
case 'stale': | ||
case 'length': | ||
case 'itemCount': { | ||
return getCacheConfigValue(); | ||
} | ||
case 'decr': | ||
case 'incr': { | ||
return incrementOrDecrement(); | ||
} | ||
case 'mGet': { | ||
return getMultipleValues(); | ||
} | ||
case 'mSet': { | ||
return setMultipleValues(); | ||
} | ||
case 'mDel': { | ||
return deleteMultipleValues(); | ||
} | ||
// return the function value | ||
default: { | ||
return sendResponseToWorker({ | ||
value: lru[request.func](...params), | ||
}); | ||
} | ||
} | ||
}); | ||
}); | ||
} | ||
const getPromiseHandler = (func) => { | ||
const handler = promiseHandlerFunctions[func]; | ||
return handler ? handler : defaultLruFunction; | ||
}; | ||
@@ -198,85 +165,21 @@ | ||
// lru-cache on the master thread if this is a worker | ||
const getPromisified = (namespace, options) => { | ||
const lru = getLruCache(namespace, options); | ||
return (...args) => { | ||
// first argument is the function to run | ||
const func = args[0]; | ||
// the rest of the args are the function arguments of N length | ||
const funcArgs = Array.prototype.slice.call(args, 1, args.length); | ||
// acting on the local lru-cache | ||
messages(namespace, args); | ||
let promise; | ||
switch (func) { | ||
case 'max': | ||
case 'maxAge': | ||
case 'stale': { | ||
promise = new Promise((resolve) => { | ||
if (funcArgs[0]) { | ||
lru[func] = funcArgs[0]; | ||
} | ||
return resolve(lru[func]); | ||
}); | ||
break; | ||
} | ||
case 'decr': | ||
case 'incr': { | ||
promise = new Promise((resolve) => { | ||
// get the current value default to 0 | ||
let value = lru.get(funcArgs[0]); | ||
// maybe initialize and increment | ||
value = | ||
(typeof value === 'number' ? value : 0) + | ||
(funcArgs[1] || 1) * (func === 'decr' ? -1 : 1); | ||
// set the new value | ||
lru.set(funcArgs[0], value); | ||
// resolve the new value | ||
return resolve(value); | ||
}); | ||
break; | ||
} | ||
case 'mGet': { | ||
promise = new Promise((resolve) => { | ||
const mGetValues = utils.mGet(lru, funcArgs); | ||
return resolve(mGetValues); | ||
}); | ||
break; | ||
} | ||
case 'mSet': { | ||
promise = new Promise((resolve) => { | ||
utils.mSet(lru, funcArgs); | ||
return resolve(true); | ||
}); | ||
break; | ||
} | ||
case 'mDel': { | ||
promise = new Promise((resolve) => { | ||
utils.mDel(lru, funcArgs); | ||
return resolve(true); | ||
}); | ||
break; | ||
} | ||
case 'itemCount': | ||
case 'length': { | ||
// return the property value | ||
promise = new Promise((resolve) => { | ||
return resolve(lru[func]); | ||
}); | ||
break; | ||
} | ||
default: { | ||
// just call the function on the lru-cache | ||
promise = new Promise((resolve) => { | ||
return resolve(lru[func](...funcArgs)); | ||
}); | ||
break; | ||
} | ||
} | ||
return promise; | ||
}; | ||
}; | ||
module.exports = { | ||
processMessages, | ||
getLruCache, | ||
getPromisified, | ||
getPromisified: (namespace, options) => { | ||
return (...args) => { | ||
// acting on the local lru-cache | ||
messages(namespace, args); | ||
// first argument is the function to run | ||
const func = args[0]; | ||
// the rest of the args are the function arguments of N length | ||
const funcArgs = Array.prototype.slice.call(args, 1, args.length); | ||
return getPromiseHandler(func)({ | ||
caches, | ||
namespace, | ||
options, | ||
func, | ||
funcArgs, | ||
startPruneCronJob, | ||
}); | ||
}; | ||
}, | ||
}; |
@@ -12,16 +12,46 @@ const cluster = require('cluster'); | ||
const processMessages = function () { | ||
// run on each worker thread | ||
if (cluster.isWorker) { | ||
process.on('message', (response) => { | ||
messages(`Worker ${cluster.worker.id} recieved message`, response); | ||
// look up the callback based on the response ID, delete it, then call it | ||
if (response.source !== config.source || !callbacks[response.id]) return; | ||
const callback = callbacks[response.id]; | ||
delete callbacks[response.id]; | ||
callback(response); | ||
}); | ||
} | ||
}; | ||
// run on each worker thread | ||
if (cluster.isWorker) { | ||
process.on('message', (response) => { | ||
messages(`Worker ${cluster.worker.id} recieved message`, response); | ||
// look up the callback based on the response ID, delete it, then call it | ||
if (response.source !== config.source || !callbacks[response.id]) return; | ||
const callback = callbacks[response.id]; | ||
delete callbacks[response.id]; | ||
callback(response); | ||
}); | ||
} | ||
const requestToMaster = (cache, func, funcArgs) => | ||
new Promise((resolve, reject) => { | ||
// create the request to the master | ||
const request = { | ||
source: config.source, | ||
namespace: cache.namespace, | ||
id: uuid.v4(), | ||
func, | ||
arguments: funcArgs, | ||
}; | ||
// if we don't get a response in 100ms, return undefined | ||
let failsafeTimeout = setTimeout( | ||
() => { | ||
failsafeTimeout = null; | ||
return cache.failsafe === 'reject' | ||
? reject(new Error('Timed out in isFailed()')) | ||
: resolve(); | ||
}, | ||
func === '()' ? 5000 : cache.timeout | ||
); | ||
// set the callback for this id to resolve the promise | ||
callbacks[request.id] = (result) => { | ||
if (failsafeTimeout) { | ||
clearTimeout(failsafeTimeout); | ||
return resolve(result.value); | ||
} | ||
return false; | ||
}; | ||
// send the request to the master process | ||
process.send(request); | ||
}); | ||
const getPromisified = (cache, options) => { | ||
@@ -37,33 +67,3 @@ // return a promise that resolves to the result of the method on | ||
// cluster.isWorker | ||
return new Promise((resolve, reject) => { | ||
// create the request to the master | ||
const request = { | ||
source: config.source, | ||
namespace: cache.namespace, | ||
id: uuid.v4(), | ||
func, | ||
arguments: funcArgs, | ||
}; | ||
// if we don't get a response in 100ms, return undefined | ||
let failsafeTimeout = setTimeout( | ||
() => { | ||
failsafeTimeout = null; | ||
if (cache.failsafe === 'reject') { | ||
return reject(new Error('Timed out in isFailed()')); | ||
} | ||
return resolve(); | ||
}, | ||
func === '()' ? 5000 : cache.timeout | ||
); | ||
// set the callback for this id to resolve the promise | ||
callbacks[request.id] = (result) => { | ||
if (failsafeTimeout) { | ||
clearTimeout(failsafeTimeout); | ||
return resolve(result.value); | ||
} | ||
return false; | ||
}; | ||
// send the request to the master process | ||
process.send(request); | ||
}); | ||
return requestToMaster(cache, func, funcArgs); | ||
}; | ||
@@ -84,4 +84,3 @@ | ||
module.exports = { | ||
processMessages, | ||
getPromisified, | ||
}; |
@@ -15,8 +15,2 @@ /** | ||
// setup the the master to handle messages for each worker | ||
master.processMessages(); | ||
// setup each worker to handle messages from the master | ||
worker.processMessages(); | ||
/** | ||
@@ -23,0 +17,0 @@ * LRUCacheForClustersAsPromised roughly approximates the functionality of LRUCache |
{ | ||
"name": "lru-cache-for-clusters-as-promised", | ||
"version": "1.6.1", | ||
"version": "1.7.0", | ||
"types": "index.d.ts", | ||
@@ -5,0 +5,0 @@ "description": "LRU Cache that is safe for clusters", |
@@ -10,7 +10,12 @@ const request = require('supertest'); | ||
// run before the tests start | ||
before((done) => { | ||
before(function (done) { | ||
// This will call done with the cluster has forked and the worker is listening | ||
this.timeout(5000); | ||
master = require('./lib/cluster-master')(done); | ||
}); | ||
after((done) => { | ||
master.shutdown(done); | ||
}); | ||
afterEach((done) => { | ||
@@ -17,0 +22,0 @@ request(`http://${config.server.host}:${config.server.port}`) |
const cluster = require('cluster'); | ||
const os = require('os'); | ||
const path = require('path'); | ||
const Debug = require('debug'); | ||
const config = require('../../config'); | ||
const LRUCache = require('../../'); | ||
const debug = new Debug(`${config.source}-test-cluster-master`); | ||
LRUCache.init(); | ||
@@ -24,2 +27,3 @@ | ||
'none', | ||
// '--include-all-sources', | ||
// output files will have the workers PID in the filename | ||
@@ -40,14 +44,4 @@ '--include-pid', | ||
// for each worker created... | ||
cluster.on('fork', (worker) => { | ||
// wait for the worker to send a message | ||
worker.on('message', (request) => { | ||
if (request === 'hi') { | ||
worker.send('hello'); | ||
} | ||
}); | ||
}); | ||
// create one process per CPU core | ||
const workers = os.cpus().length; | ||
// start one worker to handle the threads | ||
const workers = 1; | ||
for (let i = 0; i < workers; i += 1) { | ||
@@ -71,2 +65,12 @@ cluster.fork(); | ||
}); | ||
// wait for the worker to send a message | ||
worker.on('message', (request) => { | ||
if (request === 'hi') { | ||
worker.send('hello'); | ||
} | ||
}); | ||
// wait for the worker to send a message | ||
worker.on('error', (error) => { | ||
debug(error.message); | ||
}); | ||
}); | ||
@@ -98,3 +102,8 @@ return { | ||
}, | ||
shutdown: (done) => { | ||
cluster.disconnect(() => { | ||
done(); | ||
}); | ||
}, | ||
}; | ||
}; |
@@ -11,2 +11,4 @@ const config = require('./test-config'); | ||
require('../../lib/worker'); | ||
// this will be the SAME cache no matter which module calls it. | ||
@@ -16,3 +18,2 @@ const defaultCache = new LRUCache({ | ||
maxAge: 100000, | ||
stale: true, | ||
}); | ||
@@ -36,3 +37,3 @@ defaultCache.keys(); | ||
if (err) { | ||
return res.send(err); | ||
return res.send(err.message); | ||
} | ||
@@ -39,0 +40,0 @@ return res.send(true); |
@@ -10,5 +10,5 @@ module.exports = { | ||
server: { | ||
port: process.env.PORT || 6666, | ||
port: process.env.PORT || 4242, | ||
host: '127.0.0.1', | ||
}, | ||
}; |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
66952
26
1402