LevelTwo
An in-process cache for asynchronous data fetching to enable instantaneous results.
Configuring an external message broker (Redis, Kafka, RabbitMQ, etc.) will enable a distributed cache by signaling when entries should be updated or deleted. This keeps data in sync across multiple instances of the application by always having the most up to date value.
Configuring an external cache (Redis, Memcached, etc.) will enable a "longer term" cache that can persist application restarts and reduce database load in volatile environments.
Review the list of supported extensions below for easy configuration of common brokers and caches.
Usage
const levelTwo = new LevelTwo({
cacheDefaults: {
ttl: 60 * 1000,
},
});
const worker = levelTwo.createWorker("customer", async (ids, earlyWrite) => {
const rows = await mysql.query(
"SELECT id, name FROM customers WHERE id IN (?)",
[ids]
);
rows.forEach((row) => earlyWrite(row.id, row));
});
export async function getCustomer(id: string): Promise<Customer> {
return await worker.get(id);
}
export async function getCustomerList(ids: string[]): Promise<Customer[]> {
return await worker.getMulti(ids);
}
Only the first request for a specific customer will reach out to the database, every subsequent request for that same identifier will pull from the worker cache (in process memory) and return immediately for the duration of the TTL set.
Even for simple data fetching like getting customer objects above, through the use of the local cache, applications will be able to handle significantly larger request loads. The following benchmark was run on a 2022 MacBook Air M2.
| 5 entries | 25 entries | 50 entries | 100 entries |
---|
MySQL Direct Fetching | 11,131 ops/sec ±1.79% | 8,855 ops/sec ±0.29% | 6,984 ops/sec ±0.86% | 5,439 ops/sec ±0.83% |
Redis Cache Fetching | 34,219 ops/sec ±1.61% | 22,107 ops/sec ±1.29% | 15,336 ops/sec ±0.10% | 10,704 ops/sec ±0.94% |
Using LevelTwo | 6,945,483 ops/sec ±1.88% | 3,558,506 ops/sec ±0.12% | 2,351,602 ops/sec ±0.11% | 1,403,223 ops/sec ±0.91% |
How It Works
The core of LevelTwo is built around batch fetching of data. Leveraging BurstValve to reduce concurrency down to a single active request for a given identifier per application.
const [github, npm] = await worker.getMulti(["github", "npm"]);
const [req1, req2] = await Promise.all([
worker.getMulti(["github", "circleci", "jira", "figma"]),
worker.getMulti(["circleci", "figma", "npm", "slack"]),
]);
req1;
req2;

- Github & NPM are already saved to the worker cache (in-process)
- Req1: A data fetch is triggered CircleCI, Jira, & Figma
- Req2: CircleCI & Figma are already active, a listener is added to await their results
- Req2: A single data fetch it triggered for Slack
- Results are collected and returned in the order the unique identifiers were passed for each request
Worker Fetch Process
The fetch process is how a developer implements the database (or external service) fetching of cacheable data.
type WorkerFetchProcess = (
identifiers: IdentifierType[],
earlyWrite: (
id: IdentifierType,
result: ResultType | Error | undefined
) => Promise<void>
) => Promise<
| Map<IdentifierType, ResultType | Error | undefined>
| Array<ResultType | Error | undefined>
| void
>;
There are three formats for returning data to the worker to be cached:
- Recommended: Use the earlyWrite mechanism to send results as they become available
- Return a Map object with the results mapped to their ids
- Return an array of results that lineup exactly with the corresponding identifiers passed in, returning
undefined
where data could not be found
Returning undefined
or an Error
instance in replace of a result object will signal to the worker that nothing should be cached
Cache Options
ttl
Amount of time, in milliseconds, until cache entry is stale. This value will be passed to the remote cache instance when setting new values into a worker. Defaults to one minute.
ttlLocal
Amount of time, in milliseconds, until local cache entry is stale. This value applies only to local cache entries, and does not get passed to remote cache instances. Defaults to the value of ttl.
minimumRequestsForCache
Minimum number of requests that must be made before entry can be added to the local cache. Defaults to none.
minimumRequestsTimeThreshold
Number of milliseconds between now and the "minimumRequestsForCache" request in order for the entry to be placed in the local cache. Defaults to length of the ttl.
For example:
minimumRequestsForCache
: 5minimumRequestsTimeThreshold
: 1000
A request must be made five times for the same unique identifier within the last one second for the result value to be added to the local cache.
maximumCacheKeys
Maximum number of cache keys that can exist in the local cache. Defaults to none.
staleCacheThreshold
Number of milliseconds stale local cache entries are allowed to be used while an updated value is fetched. Defaults to none.
ignoreCacheFetchErrors
Indicator if any exceptions raised by the remote cache should be ignored, which would result in those ids being sent to the worker instead of returning an error for each id. Defaults to false.
Worker API
Namespaced workers are used to fetch and cache data for a specific object type. The worker instance itself is an Iterable that can be looped over to investigate all valid cache entries.
get(id: IdentifierType)
Shortcut method for batch fetching a single object. Any error is thrown instead of returned
getMulti(ids: IdentifierType[])
Gets a a list of values for the identifiers provided. First attempts to find values from the local cache, then falls back to remote cache if defined, and finally pulls from the worker process.
Identifiers not found in local cache are then run through a BurstValve to batch fetch from remote cache or worker process. This means that there will only be one active request per identifier for the given worker.
getUnsafeMulti(ids: IdentifierType[])
Similar to getMulti, fetches list of values for the identifiers provided, but raises exceptions when they are found instead of returning errors
stream(ids: IdentifierType[], streamResultCallback: (id: IdentifierType, result: ResultType) => Promise<void>)
Exposes data as it becomes available for the unique identifiers requested
upsert(id: IdentifierType, skipRemoteCache?: boolean)
Shortcut method for upserting a single object. Any error is thrown instead of returned.
upsertMulti(ids: IdentifierType[], skipRemoteCache?: boolean)
Bypasses local cache and burst valve, fetching raw data directly from the remote cache or fetcher process. When skipRemoteCache is enabled, results are pulled directly from the fetcher process and set back into the remote cache (successful fetches only).
set(id: IdentifierType, value: ResultType, ttl?: number)
Shortcut method for setting a single entry.
setMulti(entries: CacheEntry[])
Sets cache entries in both remote and local caches, then signals to all other systems to update their locals.
delete(id: IdentifierType)
Shortcut method for deleting a single entry.
deleteMulti(ids: IdentifierType[])
Deletes cache entries in both remote and local caches, then signals to all other systems to delete cache entry from their locals if it exists.
touch(id: IdentifierType, ttl?: number)
Shortcut method for touching a single entry.
touchMulti(entries: CacheTouch[])
Extending ttl on the ids provided, ignoring any ids that don't exist. Signal will be sent to all workers, and remote cache will be touched in parallel.
exists(id: IdentifierType, skipLocalCache?: boolean)
Shortcut method for checking existence of a single id.
existsMulti(ids: IdentifierType[], skipLocalCache?: boolean)
Identifies if the list of unique identifiers exist in either the local cache or the remote cache.
broadcast(action: "upsert" | "delete", ids: IdentifierType[])
Broadcasts action to workers connected through the message broker
size()
Number of valid entries in the cache.
has(id: IdentifierType)
Indicates if the identifier exists in the local cache only.
hasMulti(id: IdentifierType[])
Indicates if the identifiers exist in the local cache only.
keys()
Returns list of valid keys in the cache
values()
Returns list of valid cache entry values in the cache.
entries()
Returns list of valid cache entries in the cache.
forEach(iter: (value: ResultType, id: IdentifierType) => void)
Executes the provided function once per each valid id/value pair in the cache.
prefill(entries: CacheEntry[])
Inserts the provided list of entries into the local cache only, overriding any existing entries.
peek(id: IdentifierType)
Returns local cache entries only for the unique identifier provided, or undefined if the entry does not exist.
peekMulti(ids: IdentifierType[])
Returns local cache entries only for the list of unique identifiers provided, or undefined if the entry does not exist.
prune()
Deletes any expired (past stale) entries from the local cache, as well as any expired minimum request count trackers.
clear()
Clears local caches of all entries.
Message Broker (Keeping Workers in Sync)
While workers provide optimal performance in a single server setup, a big benefit of redis and memcache is around having the most recently cached data available for every app instance. The message broker provides an extensible interface to inject a pub/sub service for keeping worker caches in sync.

setup(levelTwo: LevelTwo): Promise<void>
Signal for when LevelTwo instance has been started
teardown(levelTwo: LevelTwo): Promise<void>
Signal for when LevelTwo instance has been stopped
publish(action: DistributedAction): Promise<void>;
Method for publishing actions that should be distributed to every LevelTwo process
subscribe(listener: (action: DistributedAction) => void): Promise<void>;
Method for listening for actions that should be taken on every LevelTwo process
Remote Cache (Persisting Cache Entries)
While workers are a more performant alternative to cache services like redis or memcache, they come with a flaw around persistence in that the cache is fully lost on any code deploy or application restart. If a worker fetch method has a heavy computational expense, this can become costly for cases like a flaky docker container that restarts often and has to re-trigger the fetch on each cycle.
The remote cache provides an extensible interface to keep a worker cache and external cache in sync. It does not listen to any changes in the remote cache itself, relying on end to end use of LevelTwo for signals to be sent properly for multi server cache updates.
setup(levelTwo: LevelTwo): Promise<void>
Signal for when LevelTwo instance has been started
teardown(levelTwo: LevelTwo): Promise<void>
Signal for when LevelTwo instance has been stopped
get(worker: WorkerIdentifierType, ids: any[], earlyWrite: (id: any, value: Result) => Promise<void>) => Promise<Result[] | Map<any, Result> | void>
Fetching list of unique identifiers (namespaced by worker id) from external cache. This will be called before the actual worker is called. If a result is found, then the worker will not be called for that specific id.
Note*: Workers follow the BurstValve paradigm, which means multiple independent fetches could be waiting on this one call to complete. To follow that pattern, an early write mechanism is provided to unblock parallel queues as soon as data comes in
set(worker: WorkerIdentifierType, entries: CacheEntry[]): Promise<void>
Setting new entries into the external cache. This is only triggered when a user declares a value has changed
delete(worker: WorkerIdentifierType, ids: any[]): Promise<void>
Deleting data from an external cache. This is only triggered when a user declares a value has been removed
touch(worker: WorkerIdentifierType, ids: CacheTouch[]): Promise<void>
Extending ttl on the ids provided. Should ignore any keys that don't exist
exists(worker: WorkerIdentifierType, ids: any[]): Promise<boolean[]>
Checks to see if the ids provided exist in the remote cache