@soundxyz/fine-grained-cache
Advanced tools
Comparing version 1.1.0 to 2.0.0
@@ -23,3 +23,25 @@ import type { Redis } from "ioredis"; | ||
}) => T; | ||
export declare function FineGrainedCache({ redis, redLock: redLockConfig, keyPrefix, memoryCache, onError, }: { | ||
export declare const Events: { | ||
readonly REDIS_GET: "REDIS_GET"; | ||
readonly REDIS_GET_TIMED_OUT: "REDIS_GET_TIMED_OUT"; | ||
readonly REDIS_SET: "REDIS_SET"; | ||
readonly REDIS_SKIP_SET: "REDIS_SKIP_SET"; | ||
readonly MEMORY_CACHE_HIT: "MEMORY_CACHE_HIT"; | ||
readonly INVALIDATE_KEY_SCAN: "INVALIDATE_KEY_SCAN"; | ||
readonly INVALIDATED_KEYS: "INVALIDATED_KEYS"; | ||
readonly EXECUTION_TIME: "EXECUTION_TIME"; | ||
readonly PIPELINED_REDIS_GETS: "PIPELINED_REDIS_GETS"; | ||
readonly REDLOCK_ACQUIRED: "REDLOCK_ACQUIRED"; | ||
readonly REDLOCK_RELEASED: "REDLOCK_RELEASED"; | ||
readonly REDLOCK_GET_AFTER_ACQUIRE: "REDLOCK_GET_AFTER_ACQUIRE"; | ||
}; | ||
export declare type Events = typeof Events[keyof typeof Events]; | ||
export declare type EventParamsObject = Record<string, string | number | boolean | null | undefined>; | ||
export declare type LogEventArgs = { | ||
message: string; | ||
code: Events; | ||
params: EventParamsObject; | ||
}; | ||
export declare type LoggedEvents = Partial<Record<Events, string | boolean | null>>; | ||
export declare function FineGrainedCache({ redis, redLock: redLockConfig, keyPrefix, memoryCache, onError, logEvents, GETRedisTimeout, pipelineRedisGET, defaultUseMemoryCache, }: { | ||
redis: Redis; | ||
@@ -38,2 +60,27 @@ redLock?: { | ||
onError?: (err: unknown) => void; | ||
/** | ||
* Enable event logging | ||
*/ | ||
logEvents?: { | ||
log: (args: LogEventArgs) => void; | ||
events: LoggedEvents; | ||
}; | ||
/** | ||
* Set a maximum amount of milliseconds for getCached to wait for the GET redis response | ||
*/ | ||
GETRedisTimeout?: number; | ||
/** | ||
* Enable usage of redis pipelines for redis GET. | ||
* | ||
* If "number" is specified, that's the maximum amount of operations to be sent in a single pipeline | ||
*/ | ||
pipelineRedisGET?: boolean | number; | ||
/** | ||
* Should `getCached` use memory cache by default? | ||
* | ||
* It can be overriden on `getCached` | ||
* | ||
* @default true | ||
*/ | ||
defaultUseMemoryCache?: boolean; | ||
}): { | ||
@@ -40,0 +87,0 @@ getCached: <T>(cb: CachedCallback<T>, { timedInvalidation, ttl, keys, maxExpectedTime, retryLockTime, checkShortMemoryCache, useSuperjson, useRedlock, forceUpdate, }: { |
@@ -9,2 +9,3 @@ 'use strict'; | ||
const utils = require('./utils.js'); | ||
const promises = require('timers/promises'); | ||
@@ -35,2 +36,16 @@ function _interopDefaultLegacy (e) { return e && typeof e === 'object' && 'default' in e ? e : { 'default': e }; } | ||
} | ||
const Events = { | ||
REDIS_GET: "REDIS_GET", | ||
REDIS_GET_TIMED_OUT: "REDIS_GET_TIMED_OUT", | ||
REDIS_SET: "REDIS_SET", | ||
REDIS_SKIP_SET: "REDIS_SKIP_SET", | ||
MEMORY_CACHE_HIT: "MEMORY_CACHE_HIT", | ||
INVALIDATE_KEY_SCAN: "INVALIDATE_KEY_SCAN", | ||
INVALIDATED_KEYS: "INVALIDATED_KEYS", | ||
EXECUTION_TIME: "EXECUTION_TIME", | ||
PIPELINED_REDIS_GETS: "PIPELINED_REDIS_GETS", | ||
REDLOCK_ACQUIRED: "REDLOCK_ACQUIRED", | ||
REDLOCK_RELEASED: "REDLOCK_RELEASED", | ||
REDLOCK_GET_AFTER_ACQUIRE: "REDLOCK_GET_AFTER_ACQUIRE" | ||
}; | ||
function FineGrainedCache({ | ||
@@ -44,3 +59,7 @@ redis, | ||
}), | ||
onError = console.error | ||
onError = console.error, | ||
logEvents, | ||
GETRedisTimeout, | ||
pipelineRedisGET, | ||
defaultUseMemoryCache = true | ||
}) { | ||
@@ -51,8 +70,119 @@ const redLock = redLockConfig?.client; | ||
const useRedlockByDefault = redLockConfig?.useByDefault ?? false; | ||
function getTracing() { | ||
const start = performance.now(); | ||
return () => `${(performance.now() - start).toFixed()}ms`; | ||
} | ||
const enabledLogEvents = logEvents?.events; | ||
const logMessage = logEvents ? function logMessage2(code, params) { | ||
let codeValue = logEvents.events[code]; | ||
if (!codeValue) | ||
return; | ||
if (typeof codeValue !== "string") | ||
codeValue = Events[code]; | ||
let paramsString = ""; | ||
for (const key in params) { | ||
const value = params[key]; | ||
if (value === void 0) | ||
continue; | ||
paramsString += " " + key + "=" + params[key]; | ||
} | ||
logEvents.log({ | ||
code, | ||
message: `[${codeValue}]${paramsString}`, | ||
params | ||
}); | ||
} : () => void 0; | ||
function generateCacheKey(keys) { | ||
return (typeof keys === "string" ? keyPrefix + ":" + keys : keyPrefix + ":" + keys.join(":").replaceAll("*:", "*").replaceAll(":*", "*")).toLowerCase(); | ||
} | ||
let pendingRedisGets = []; | ||
let pendingRedisTimeout; | ||
function pipelinedRedisGet(key) { | ||
if (pendingRedisTimeout !== void 0) { | ||
clearTimeout(pendingRedisTimeout); | ||
} | ||
if (typeof pipelineRedisGET === "number" && pendingRedisGets.length >= pipelineRedisGET) { | ||
executePipeline(); | ||
} | ||
const promise = utils.createDeferredPromise(); | ||
pendingRedisGets.push([key, promise]); | ||
pendingRedisTimeout = setTimeout(executePipeline); | ||
return promise.promise; | ||
async function executePipeline() { | ||
pendingRedisTimeout = void 0; | ||
const size = pendingRedisGets.length; | ||
const { promises, commands } = pendingRedisGets.reduce((acc, [key2, promise2], index) => { | ||
acc.promises[index] = { | ||
promise: promise2, | ||
index | ||
}; | ||
acc.commands[index] = ["get", key2]; | ||
return acc; | ||
}, { | ||
promises: new Array(size), | ||
commands: new Array(size) | ||
}); | ||
const tracing = enabledLogEvents?.PIPELINED_REDIS_GETS ? getTracing() : null; | ||
pendingRedisGets = []; | ||
try { | ||
const pipeline = redis.pipeline(commands); | ||
const results = await pipeline.exec(); | ||
if (tracing) { | ||
logMessage("PIPELINED_REDIS_GETS", { | ||
keys: commands.map(([, key2]) => key2).join(","), | ||
cache: results?.map(([, result]) => typeof result === "string" ? "HIT" : "MISS").join(",") || "null", | ||
size, | ||
time: tracing() | ||
}); | ||
} | ||
for (const { promise: promise2, index } of promises) { | ||
const result = results?.[index]; | ||
if (!result) { | ||
promise2.resolve(null); | ||
} else { | ||
const [error, value] = result; | ||
if (error) { | ||
promise2.reject(error); | ||
} else { | ||
promise2.resolve(typeof value != "string" ? null : value); | ||
} | ||
} | ||
} | ||
} catch (err) { | ||
for (const { promise: promise2 } of promises) { | ||
promise2.reject(err); | ||
} | ||
} | ||
} | ||
} | ||
async function getRedisCacheValue(key, useSuperjson, checkShortMemoryCache) { | ||
const tracing = enabledLogEvents?.REDIS_GET || enabledLogEvents?.REDIS_GET_TIMED_OUT ? getTracing() : null; | ||
let timedOut = void 0; | ||
try { | ||
const redisValue = await redis.get(key); | ||
const redisGet = pipelineRedisGET ? pipelinedRedisGet(key) : redis.get(key).then((value) => { | ||
if (enabledLogEvents?.REDIS_GET) { | ||
logMessage("REDIS_GET", { | ||
key, | ||
cache: value == null ? "MISS" : "HIT", | ||
timedOut, | ||
time: tracing?.() | ||
}); | ||
} | ||
return value; | ||
}, (err) => { | ||
onError(err); | ||
return null; | ||
}); | ||
const redisValue = await (GETRedisTimeout != null ? Promise.race([redisGet, promises.setTimeout(GETRedisTimeout, void 0)]) : redisGet); | ||
if (redisValue === void 0) { | ||
timedOut = true; | ||
if (enabledLogEvents?.REDIS_GET_TIMED_OUT) { | ||
logMessage("REDIS_GET_TIMED_OUT", { | ||
key, | ||
timeout: GETRedisTimeout, | ||
time: tracing?.() | ||
}); | ||
} | ||
return NotFoundSymbol; | ||
} | ||
if (redisValue != null) { | ||
@@ -75,3 +205,3 @@ const parsedRedisValue = useSuperjson ? superjson__default["default"].parse(redisValue) : JSON.parse(redisValue); | ||
retryLockTime = defaultRetryLockTime, | ||
checkShortMemoryCache = timedInvalidation == null, | ||
checkShortMemoryCache = defaultUseMemoryCache, | ||
useSuperjson = true, | ||
@@ -81,6 +211,15 @@ useRedlock = useRedlockByDefault, | ||
}) { | ||
if (checkShortMemoryCache && timedInvalidation != null) { | ||
checkShortMemoryCache = false; | ||
} | ||
const key = generateCacheKey(keys); | ||
if (checkShortMemoryCache && forceUpdate === false) { | ||
if (memoryCache.has(key)) | ||
if (memoryCache.has(key)) { | ||
if (enabledLogEvents?.MEMORY_CACHE_HIT) { | ||
logMessage("MEMORY_CACHE_HIT", { | ||
key | ||
}); | ||
} | ||
return memoryCache.get(key); | ||
} | ||
} | ||
@@ -99,2 +238,3 @@ return ConcurrentCachedCall(key, async () => { | ||
try { | ||
const tracing = enabledLogEvents?.REDLOCK_ACQUIRED ? getTracing() : null; | ||
lock = await redLock.acquire(["lock:" + key], maxLockTime, { | ||
@@ -104,2 +244,9 @@ retryCount, | ||
}); | ||
if (tracing) { | ||
logMessage("REDLOCK_ACQUIRED", { | ||
key, | ||
attempts: lock.attempts.length, | ||
time: tracing() | ||
}); | ||
} | ||
} catch (err) { | ||
@@ -111,6 +258,27 @@ onError(err); | ||
if (lock && lock.attempts.length > 1) { | ||
lock.release().catch(() => null).finally(() => lock = null); | ||
const redisValueAfterLock = await getRedisCacheValue(key, useSuperjson, checkShortMemoryCache); | ||
if (redisValueAfterLock !== NotFoundSymbol) | ||
return redisValueAfterLock; | ||
{ | ||
const tracing = enabledLogEvents?.REDLOCK_RELEASED ? getTracing() : null; | ||
lock.release().then(({ attempts }) => { | ||
if (tracing) { | ||
logMessage("REDLOCK_RELEASED", { | ||
key, | ||
attempts: attempts.length, | ||
time: tracing() | ||
}); | ||
} | ||
}).catch(() => null).finally(() => lock = null); | ||
} | ||
{ | ||
const tracing = enabledLogEvents?.REDLOCK_GET_AFTER_ACQUIRE ? getTracing() : null; | ||
const redisValueAfterLock = await getRedisCacheValue(key, useSuperjson, checkShortMemoryCache); | ||
if (tracing) { | ||
logMessage("REDLOCK_GET_AFTER_ACQUIRE", { | ||
key, | ||
cache: redisValueAfterLock !== NotFoundSymbol ? "HIT" : "MISS", | ||
time: tracing() | ||
}); | ||
} | ||
if (redisValueAfterLock !== NotFoundSymbol) | ||
return redisValueAfterLock; | ||
} | ||
} | ||
@@ -125,2 +293,3 @@ return await getNewValue(); | ||
let expirySeconds = 1; | ||
const tracing = enabledLogEvents?.EXECUTION_TIME ? getTracing() : null; | ||
const newValue = await cb({ | ||
@@ -138,2 +307,8 @@ setTTL(options) { | ||
}); | ||
if (tracing) { | ||
logMessage("EXECUTION_TIME", { | ||
key, | ||
time: tracing() | ||
}); | ||
} | ||
try { | ||
@@ -145,5 +320,28 @@ const timedInvalidationDate = currentTimedInvalidation ? typeof currentTimedInvalidation === "function" ? await currentTimedInvalidation() : currentTimedInvalidation : null; | ||
if (expirySeconds > 0) { | ||
const tracing2 = enabledLogEvents?.REDIS_SET ? getTracing() : null; | ||
await redis.setex(key, expirySeconds, stringifiedValue); | ||
if (tracing2) { | ||
logMessage("REDIS_SET", { | ||
key, | ||
expirySeconds, | ||
timedInvalidationDate: timedInvalidationDate?.toISOString(), | ||
time: tracing2() | ||
}); | ||
} | ||
} else if (ttl === "Infinity") { | ||
const tracing2 = enabledLogEvents?.REDIS_SET ? getTracing() : null; | ||
await redis.set(key, stringifiedValue); | ||
if (tracing2) { | ||
logMessage("REDIS_SET", { | ||
key, | ||
expirySeconds: "Infinity", | ||
timedInvalidationDate: timedInvalidationDate?.toISOString(), | ||
time: tracing2() | ||
}); | ||
} | ||
} else if (enabledLogEvents?.REDIS_SKIP_SET) { | ||
logMessage("REDIS_SKIP_SET", { | ||
key, | ||
timedInvalidationDate: timedInvalidationDate?.toISOString() | ||
}); | ||
} | ||
@@ -162,5 +360,26 @@ } catch (err) { | ||
const key = generateCacheKey(keys); | ||
const keysToInvalidate = key.includes("*") ? await redis.keys(key) : [key]; | ||
let keysToInvalidate; | ||
if (key.includes("*")) { | ||
const tracing = enabledLogEvents?.INVALIDATE_KEY_SCAN ? getTracing() : null; | ||
keysToInvalidate = await redis.keys(key); | ||
if (tracing) { | ||
logMessage("INVALIDATE_KEY_SCAN", { | ||
key, | ||
keysToInvalidate: keysToInvalidate.join(",") || "null", | ||
time: tracing() | ||
}); | ||
} | ||
} else { | ||
keysToInvalidate = [key]; | ||
} | ||
if (keysToInvalidate.length) { | ||
const tracing = enabledLogEvents?.INVALIDATED_KEYS ? getTracing() : null; | ||
await redis.del(keysToInvalidate); | ||
if (tracing) { | ||
logMessage("INVALIDATED_KEYS", { | ||
key, | ||
invalidatedKeys: keysToInvalidate.join(",") || "null", | ||
time: tracing() | ||
}); | ||
} | ||
} | ||
@@ -177,2 +396,3 @@ } | ||
exports.Events = Events; | ||
exports.FineGrainedCache = FineGrainedCache; |
@@ -9,2 +9,3 @@ 'use strict'; | ||
exports.Events = fineGrained.Events; | ||
exports.FineGrainedCache = fineGrained.FineGrainedCache; |
export declare function getRemainingSeconds(date: Date): number; | ||
export declare class PLazy<ValueType> extends Promise<ValueType> { | ||
private _executor; | ||
private _promise?; | ||
constructor(_executor: (resolve: (value: ValueType) => void, reject: (err: unknown) => void) => void); | ||
then: Promise<ValueType>["then"]; | ||
catch: Promise<ValueType>["catch"]; | ||
finally: Promise<ValueType>["finally"]; | ||
} | ||
export declare function LazyPromise<Value>(fn: () => Value | Promise<Value>): Promise<Value>; | ||
export interface DeferredPromise<T> { | ||
promise: Promise<T>; | ||
resolve: (value: T) => void; | ||
reject: (reason: unknown) => void; | ||
} | ||
export declare function createDeferredPromise<T = void>(): DeferredPromise<T>; |
@@ -8,3 +8,53 @@ 'use strict'; | ||
} | ||
class PLazy extends Promise { | ||
constructor(_executor) { | ||
super((resolve) => resolve()); | ||
this._executor = _executor; | ||
this.then = (onFulfilled, onRejected) => (this._promise ||= new Promise(this._executor)).then(onFulfilled, onRejected); | ||
this.catch = (onRejected) => (this._promise ||= new Promise(this._executor)).catch(onRejected); | ||
this.finally = (onFinally) => (this._promise ||= new Promise(this._executor)).finally(onFinally); | ||
} | ||
} | ||
function LazyPromise(fn) { | ||
return new PLazy((resolve, reject) => { | ||
try { | ||
Promise.resolve(fn()).then(resolve, reject); | ||
} catch (err) { | ||
reject(err); | ||
} | ||
}); | ||
} | ||
function createDeferredPromise() { | ||
const resolve = (value) => { | ||
middlePromiseResolve({ | ||
value, | ||
resolved: true | ||
}); | ||
}; | ||
const reject = (err) => { | ||
middlePromiseResolve({ | ||
value: err, | ||
resolved: false | ||
}); | ||
}; | ||
let middlePromiseResolve; | ||
const MiddlePromise = new Promise((resolve2) => { | ||
middlePromiseResolve = resolve2; | ||
}); | ||
const promise = LazyPromise(async () => { | ||
const { resolved, value } = await MiddlePromise; | ||
if (resolved) | ||
return value; | ||
throw value; | ||
}); | ||
return { | ||
promise, | ||
resolve, | ||
reject | ||
}; | ||
} | ||
exports.LazyPromise = LazyPromise; | ||
exports.PLazy = PLazy; | ||
exports.createDeferredPromise = createDeferredPromise; | ||
exports.getRemainingSeconds = getRemainingSeconds; |
{ | ||
"name": "@soundxyz/fine-grained-cache", | ||
"version": "1.1.0", | ||
"version": "2.0.0", | ||
"description": "Fine-grained cache helper using redis", | ||
@@ -62,5 +62,9 @@ "keywords": [ | ||
}, | ||
"engines": { | ||
"node": ">=16" | ||
}, | ||
"scripts": { | ||
"release": "changeset publish", | ||
"test": "c8 --include=src --exclude=src/promise.ts ava", | ||
"test:html": "c8 -r=lcov --include=src --exclude=src/promise.ts ava && pnpm dlx serve coverage/lcov-report", | ||
"test:watch": "ava --watch", | ||
@@ -67,0 +71,0 @@ "test:watch:coverage": "bob-watch -w src test package.json -c \"pnpm test\"" |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
37480
996