Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

already

Package Overview
Dependencies
Maintainers
1
Versions
61
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

already

Promise extensions; map, filter, etc

  • 3.4.1
  • latest
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
120K
increased by9.98%
Maintainers
1
Weekly downloads
 
Created
Source

npm version downloads build status coverage status greenkeeper badge Language grade: JavaScript

Already

already is a set of promise helper functions which many of them are also found in libraries such as Bluebird.

The functions are standalone and depends on no particular Promise implementation and therefore works well for JavaScript's built-in Promise.

The library is written in TypeScript, so typings are provided. It is exported only as an ESM package!

Versions

  • Since version 2, Finally and Try are removed. They should be replaced with Promise.prototype.finally and async functions.
  • Since version 3, it's only exported as an ESM package.

Types

Functions

  • concurrent
     Run a function with certain concurrency
  • delay
     Create a promise which resolved after a certain time
  • tap
    "Listen" to a promise version in a .then-chain without modifying the value
  • props
    Promise.all but for objects/properties
  • filter
     Asynchronuos version of Array.prototype.filter
  • map
     Asynchronuos version of Array.prototype.map
  • flatMap
     Asynchronuos version of Array.prototype.flatMap
  • reduce
     Asynchronuos version of Array.prototype.reduce
  • each
     Asynchronuos version of Array.prototype.forEach
  • some
     Asynchronuos version of Array.prototype.some
  • once
     Wrap a function and ensure it only runs once (with asynchrony)
  • retry
     Asynchronously retry a function call
  • defer
     Create a promise and extract its resolve/reject functions
  • deferSet
     Create a set of deferred promises
  • reflect
     Get a promise's resolved value or rejected error in a success flow
  • inspect
     Inspect a promise. Is it pending? Is it rejected?
  • specific
     Catch specific types, like many languages have error type matching in subsequent catch statements
  • rethrow
     Ensure a callback re-throws (to not silently swallow errors)
  • timeout
     Timeout a promise (race it against a timer)
  • wrapFunction
     Wrap a function with a potentially asynchronous prolog and/or epilog (e.g. init/cleanup)
  • funnel
     Ensure certain parts of a function is executed without concurrency (think asynchrony barrier)

Types

PromiseOf

PromiseOf< P > returns the Promise wrapped value of P, unless it's already a promise, where the promise itself is returned instead.

  • For P (being Promise< E >), it returns P
    • E.g. Promise< string >Promise< string >
  • For non-promise E, it returns Promise< E >
    • E.g. stringPromise< string >

PromiseElement

PromiseElement< P > returns the element type of a promise, or the type itself if it isn't wrapped in a promise.

  • For P (being Promise< E >), it returns E
    • E.g. Promise< string >string
  • For non-promise E, it returns E
    • E.g. stringstring

EnsurePromise

EnsurePromise< P > returns P if it is a promise. Otherwise the type is never.

EnsureNotPromise

EnsureNotPromise< T > returns T if it is not a promise. Otherwise the type is never.

IfPromise

IfPromise< P, T[, U = never] > returns T if P is a promise, otherwise returns U.

IfNotPromise

IfNotPromise< P, T[, U = never] > returns U if P is a promise, otherwise returns T.

Functions

concurrent

Since version 2 of this package, the dependency on throat was removed. This function works like throat; it wraps a function with concurrency, returning a new function that can be called repeatedly, but will only call the underlying function with the provided concurrency.

The function takes a concurrency option, and optionally the function to be wrapped. If the second argument isn't passed, the returned function takes a function as first argument. This allows you to run separate functions, yet guarantee a maximum concurrency.

import { concurrent } from 'already'

// function readSomethingFromDb(): Promise<any>;

const concurrently = concurrent( 3, readSomethingFromDb );

// This will ensure <readSomethingFromDb> isn't called more than 3 times concurrently
const results = await Promise.all(
    listOfIds.map( id => concurrently( id ) )
);

or without specifying the function, so that different functions can share concurrency:

import { concurrent } from 'already'

const concurrently = concurrent( 3 );

const results = await Promise.all(
    listOfThings.map( thing =>
        typeof thing === 'string'
        ? concurrently( readSomethingElse, thing )
        : concurrently( readSomethingFromDb, thing )
    )
);

delay

The standalone delay function takes a milliseconds argument and returns a promise which is resolved after that time. An optional value can be given too, resolving in a promise with this future value.

import { delay } from 'already'

delay( 100 ).then( ( ) => console.log( "100ms has passed" ) )
// or
delay( 100, "foo" ).then( val => console.log( val ) )

It can also be used to delay a promise chain if it is resolved, using delayChain. The delay will be ignored if the upstream promise contains an error.

import { delayChain } from 'already'

somePromise
.then( delayChain( 100 ) )

To always delay a chain, regardless of whether it was resolved or rejected, use:

somePromise
.finally( delayChain( 100 ) )

tap

A similar function to then is tap which is called only on resolved promises. The callback cannot alter the value flow of the promise, i.e. it cannot have a return value. This is useful for logging/debugging, etc. If it returns a promise, it will be awaited before letting the flow continue down the promise chain.

Note; If the tap callback either throws an error, or returns a promise which is rejected, the flow will continue with this error and not the upstream value.

import { tap } from 'already'

somePromise
.then( tap( value => { /* tap handler where the value is available */ } ) )

props

As an alternative to Promise.all( ) which awaits all promises in an array, props( ) awaits all properties in an object.

The properties are enumerated and awaited as Promise.all( ), so if any of the promises are rejected, the same flow will happen as when calling Promise.all( ) (i.e. the returned promise will contain the error/errors).

import { props } from 'already'

props( { a: someValue, b: somePromise } )
.then( ( { a, b } ) => { /* a and b are now values (not promises) */ } )

Obviously, props can be used in a promise chain, by just refering to the function rather than calling it.

import { props } from 'already'

Promise.resolve( { a: someValue, b: somePromise } )
.then( props )
.then( ( { a, b } ) => { /* a and b are now values (not promises) */ } )

filter

The filter helper can operate on arrays of promises, and will do the same as waiting for all promises in the array and then applying array.filter( ) on the result. If the filter callback returns a promise, it will be awaited (and expected to eventually become a boolean). This eventual value will determine whether to include the value or not in the resulting array.

import { filter } from 'already'

somePromiseToAnArrayOfPromisesAndValues
.then( filter( item => item.shouldBeIncluded ) )

filter concurrency

By default, the values will be filtered as fast as possible, but sometimes it is preferable to only spawn n number of filter callback calls concurrently, e.g. if they perform network/database requests. This can be done by providing an optional object with the concurrency property set. This will include awaiting both the upstream values (if the array contains promises) as well as the filter callback results if they are promises. New filter callbacks will not be called if more than n promises are being awaited.

import { filter } from 'already'

somePromiseToAnArrayOfPromisesAndValues
.then( filter( { concurrency: 4 }, item => item.shouldBeIncluded( ) ) )

filter without a promise chain

The filter function can be called without a promise chain, and act on an array of values or promises as the first argument.

import { filter } from 'already'

const outArray = await filter( inArray, filterFun );
// or with custom concurrency:
const outArray = await filter( inArray, { concurrency: 4 }, filterFun );

filter operations chunked by idle time

Some filter operations (predicate functions) are heavy on calculations. To not starve the system (e.g. a browser) from CPU resources, the filter can be chopped up in smaller chunks with either a setTimeout(0) or by using requestIdleCallback.

The options used to specify concurrency can instead specify chunk. This implies a concurrency of 1, i.e. no concurrency. Chunking is mostly useful in synchronously heavy operations, not asynchronous.

Specify a chunk time explicitly, e.g. 50ms:

import { filter } from 'already'

const outArray = await filter( inArray, { chunk: 50 }, filterFun );

or use requestIdleCallback to try to maintain a hang-free experience in browsers:

import { filter } from 'already'

const outArray = await filter( inArray, { chunk: 'idle' }, filterFun );

map

Same as with filter, map acts like awaiting all promises in an array, and then applying array.map( ) on the result. Also, just like with filter, it will await the resulting promises from the map callback (if they actually are promises).

import { map } from 'already'

somePromiseToAnArrayOfPromisesAndValues
.then( map( item => JSON.stringify( item ) ) )

map concurrency

Like with filter, map allows a custom concurrency.

import { map } from 'already'

somePromiseToAnArrayOfPromisesAndValues
.then( map( { concurrency: 4 }, item => queryDB( item ) ) )

map without a promise chain

The map function can be called without a promise chain, just like filter.

import { map } from 'already'

const outArray = await map( inArray, mapFun );
// or with custom concurrency:
const outArray = await map( inArray, { concurrency: 4 }, mapFun );

map operations chunked by idle time

Some map operations (predicate functions) are heavy on calculations, just like filter. And for the same reasons, you can select chunk to chunk up a map operation to not starve system from CPU resources in (synchronously) heavy map operations:

Specify a chunk time explicitly, e.g. 50ms:

import { map } from 'already'

const outArray = await map( inArray, { chunk: 50 }, mapFun );

or use requestIdleCallback to try to maintain a hang-free experience in browsers:

import { map } from 'already'

const outArray = await map( inArray, { chunk: 'idle' }, mapFun );

flatMap

Same as with map, but flattens the first-level potential arrays, and awaits the values if they are async. Can be used within a promise chain, or standalone taking the input array as first argument, an optional options object, and then the mapper function, just like map.

import { flatMap } from 'already'

somePromiseToAnArrayOfPromisesAndValues
.then( flatMap( { concurrency: 8 }, item =>
    item.hasTwo
    ? [ asyncJob( item.first ), asyncJob( item.second ) ]
    : asyncJob( item )
) )

reduce

Reducing (folding) over an iterable of values or promises is easily done with reduce( ). The reducer function can return promises, and they will be awaited before continuing with the next value.

The mechanism for this follows the reasoning behind Bluebird's reduce in how the initial value is handled, and the last argument in the reducer function being a number, not an array.

import { reduce } from 'already'

somePromiseToAnArray
.then( reduce( reducerFn[, initialValue ] ) )

// or on an array

reduce( arrayOrIterable, reducerFn[, initialValue ] )

If called within a promise chain (as the first example above), the reduce takes one or two arguments, a reducer function and an optional initial value.

If called outside a promise chain, it also takes the array (or any other iterable, or promise to any such) as the first argument.

The reducer function is on the format

reduce( accumulator: R, current: T, index: number, length: number ) => R | PromiseLike< R >;

The accumulator has the same type as the return value (although the return can be asynchronous), which is the reduced type R. The current is of type T, which is what the input array consists of (although it may consist of PromiseLike< T > too).

This means that the returned type from reduce doesn't need to be the same as the input, although this is only true if initialValue is set. If it is set, it will be used as the first accumulator, and index will begin at 0. If initialValue is left unset (or is undefined), R and T must be the same, and index will begin at 1, since the first call will use the first index in the input as accumulator and the second as current.

length is the length of the input iterable/array, which is the same logic as in Bluebird, and unlike how Javascript's Array.reduce works (where you get the array as fourth argument).

each

each iterates an array of promises or values, very much like map, although with a default concurrency of 1.

The iterator function cannot return a value (or it will be ignored), but can return an empty promise which will be awaited before the next iteration. It's like tap but for elements in an array.

The return value of each is the input array unmodified.

If any of the iterator function calls throws an exception, or returns a rejected promise, the iteration will end and the return of each will be a promise rejected with this error.

import { each } from 'already'

somePromiseToAnArrayOfPromisesAndValues
.then( each( item => { doSomethingWith( item ); } ) )
.then( /* input array is here and unmodified */ )

// or provide the array as first argument:

const outArray = await each( inArray, iteratorFun );
// outArray ~ inArray, not necessarily the *same* array, but the same content

Concurrency and time-chunking

Just like filter and map have concurrency and time-chunking options, so does each. An optional argument before the predicate/iterator function can be used.

For concurrency:

import { each } from 'already'

await each( array, { concurrency: 4 }, iteratorFun );

and for time-chunking:

import { each } from 'already'

// Time-chunk every 50 milliseconds
await each( array, { chunk: 50 }, iteratorFun );
// Time-chunk dynamically based on requestIdleCallback()
await each( array, { chunk: 'idle' }, iteratorFun );

some

Just like filter, map and reduce which here are implemented closely mimicing the Array prototype functions but supporting asynchrony, some works similar to Array.some(). The return is different though, in that it doesn't necessarily return a promise to true or false, but rather a promise of the truthy value (of type T) or false.

The return type not being coerced to true upon match, makes it ideal in situations where reduce would otherwise be used only to find the first match. some may perform better, since it stops iterating on first match, while reduce would complete the iteration before it returns.

Like filter, map and reduce above, it supports a promise to a list, promises as values in the list, and an asynchronous predicate function.

import { some } from 'already'

somePromiseToAnArray
.then( some( predicateFn ) )
.then( ( t: T | false ) => { ... } ) // T is the return type of predicateFn

// or on an array

const t = await some( arrayOrIterable, predicateFn );
// t is of type T (the return type of predicateFn) or false

Example

import { some } from 'already'

const arr = [ 1, 2, 3 ];

async function pred( num: number ): Promise< string >
{
    // ... Implementation goes here
}

const val = await some( arr, pred );
// val is now either a string (the first truthy match) or false

once

To ensure a function is only called once, use once(). It handles both synchronous and asynchronous functions, in that you can await the wrapped function call. It will return the value returned from the wrapped function, every time the wrapper is called. It also comes in two shapes:

import { once } from 'already'

// Single function
const once1 = once( myFunction ); // Wrap a function
const ret1 = once1( ); // Will invoke myFunction
const ret2 = once1( ); // Will do nothing
// ret1 === ret2

// Multiple functions
const once2 = once( ); // Make dynamic wrapper
once2( myFunction1 ); // Will invoke myFunction1
once2( myFunction2 ); // Will invoke myFunction2
once2( myFunction1 ); // Will do nothing
once2( myFunction2 ); // Will do nothing

The dynamic approach is achieved by calling once( ) without arguments. The result wrapper can be called with different functions, and every unique function will only be invoked once.

If the functions are asynchronous, just await the wrapper call:

// Single function
const once1 = once( myFunction ); // Wrap a function
await once1( ); // Will invoke myFunction
await once1( ); // Will do nothing

// Multiple functions
const once2 = once( ); // Make dynamic wrapper
await once2( myFunction1 ); // Will invoke myFunction1
await once2( myFunction2 ); // Will invoke myFunction2
await once2( myFunction1 ); // Will do nothing
await once2( myFunction2 ); // Will do nothing

Even if the functions are invoked immediately after each other, they won't be invoked twice, but they will all wait for the wrapped function to complete:

async function myFunction( ) { ... }
const once1 = once( myFunction );
const promise = once1( ); // Will invoke myFunction
await once1( ); // Will not invoke myFunction, but await its completion!

You can pass an argument to the function if it takes one. It will still only call the function once, regardless of the argument (unlike memoize functions):

const once1 = once( ( n: number ) => n * 3 ); // Wrap a function
12 === await once1( 4 ); // Will invoke myFunction
12 === await once1( 5 ); // Will do nothing (but return the old value)

retry

The retry( ) function can be used to call a function and "retry" (call it again) if it threw an exception, or returned a rejected promise.

The retry( times, fn [, retryable ] ) function takes a number for maximum number of retries as first argument, and the function to call as the second argument. If times is 1, it will retry once, i.e. potentially calling fn two times.

The return value of retry is the same as that of fn as it will return the result of a successful call to fn( ).

The function is transparently handling callback functions (fn) returning values or promises.

The third and optional argument is a predicate function taking the error thrown/rejected from fn. It should return true if the error is retryable, and false if the error is not retryable and should propagate out of retry immediately.

Synchronous example:

function tryOpenFileSync( ) { /* ... */ } // Might throw

// Only retry ENOENT errors
const fd = retry(
    Infinity,
    tryOpenFileSync,
    err => err.code === 'ENOENT'
);

Asynchronous example:

async function sendMessage( ) { /* ... */ } // Might return a rejected promise

// Try sending 3 times. NOTE: await
const anything = await retry( 3, sendMessage );

defer

The defer function template returns an object containing both a promise and its resolve/reject functions. This is generally an anti-pattern, and new Promise( ... ) should be preferred, but this is sometimes necessary (or at least very useful).

import { defer } from 'already'

const deferred = defer< string >( );
deferred.promise; // The promise.
deferred.resolve; // The resolve function.
deferred.reject;  // The reject function.

deferred.resolve( "foo" ); // deferred.promise is now resolved to "foo"

Empty defer

To create a defer object backed by a Promise< void >, creating it through defer< void >( ) will not suffice. The returned object's resolve function will require an argument. Instead, create with an explicit void argument:

const deferred = defer( void 0 );
deferred.resolve( ); // This is now legal, typewise

deferSet

Instead of creating a lot of defer objects, e.g. in unit tests to trigger asynchrony in a certain order, deferSet is a cleaner way.

A "defer set" is a dynamically growable set of indexes (numbers) which can be awaited, resolved or rejected at any time.

deferSet( ) returns an object (of a class OrderedAsynchrony). This has the helper functions:

  • wait( index | [indices...] ) -> Promise< void >
  • resolve( index | [indices...] ) -> Promise< void >
  • reject( index | [indices...] ) -> Promise< void >
import { deferSet } from 'already'

const order = deferSet( );

order.resolve( 0 ); // Resolve index 0
await order.wait( 0 ); // Wait for promise 0 (which was resolved above)

The above will work fine, it's basically creating a defer, resolving it and then awaiting its promise. This will deadlock:

await order.wait( 0 ); // Will wait forever
order.resolve( 0 );

It's possible to wait, resolve and reject multiple indices at once, by specifying an array instead. And wait can take an optional index (or array of indices) to resolve, as well as an optional index (or array of indices) to reject.

The return value of wait( ), resolve( ) and reject( ) is a promise and the defer set itself.

// Do stuff, and eventually trigger certain index resolutions.
doFoo( ).then( ( ) => { order.resolve( 0 ); } ); // Eventually resolves index 0
doBar( ).then( ( ) => { order.resolve( [ 1, 3 ] ); } ); // Eventually resolves index 1 and 3
// etc.

await order.wait( [ 0, 1, 3 ], 2 ); // Await index 0, 1 and 3, resolve index 2.
order.reject( 4 ); // Will reject index 4 with an error.
await order.wait( 4 ); // Will (asynchronously) throw.

reflect

A promise can be either resolved or rejected, but sometimes it's convenient to have a shared flow for either occasion. That's when reflect comes in handy. It takes a promise as argument, and returns a promise to a Reflection object which contains the value or error, and the booleans isResolved and isRejected.

import { reflect } from 'already'

const somePromise = Math.random( ) < 0.5
    ? Promise.resolve( 1 )
    : Promise.reject( new Error( ) );

const reflection = await reflect( somePromise );
const { value, error, isResolved, isRejected } = reflection;

if ( isResolved )
    doSomethingWithValue( value );
else
    handleError( error );

The Reflection type has the form:

interface Reflection< T >
{
	error?: Error;
	value?: T;
	isResolved: boolean;
	isRejected: boolean;
}

inspect

In some cases is it useful to synchronously know if a promise is pending, resolved or rejected. Some promise libraries provide this on the promise as isPending( ) functions e.g.

With already, wrap the promise in an InspectablePromise using the inspect( ) function.

import { inspect } from 'already'

const inspectable = inspect( somePromise );
inspectable.promise    // <Promise> A new promise, chained from `somePromise`
inspectable.isPending  // <boolean>
inspectable.isResolved // <boolean>
inspectable.isRejected // <boolean>

Note; The returned object's promise must be used in the rest of the application, rather than the upstream promise (the one given as argument to inspect). It is technically not the same promise, and a rejection will otherwise likely result in an "Unhandled promise rejection" warning, or worse.

Note; The returned object will always be in pending-mode when the function returns, i.e. isPending will be true and isResolved and isRejected will both be false. Only after the next tick will these values have been settled. To ensure the right value "immediately", await the inspect return, to allow the value to settle:

import { inspect } from 'already'

const inspectable = await inspect( somePromise );
// inspectable.is{Pending|Resolved|Rejected} are now settled

deferInspectable

A combination of defer and inspect is sometimes useful, where deferInspectable comes in handy.

import { deferInspectable } from 'already'

const deferred = deferInspectable< T >( );
deferred.promise    // The promise.
deferred.resolve;   // The resolve function.
deferred.reject;    // The reject function.
deferred.isPending  // <boolean>
deferred.isResolved // <boolean>
deferred.isRejected // <boolean>

For promises of void type, in TypeScript create it with deferInspectable( void 0 ).

Unlike inspect, the values are immediately correct, no await is necessary to settle the values. Also, when resolve() and reject() are called, the is* booleans are synchronously set.

specific

The specific function can be used in a .catch( ... ) handler to filter the catch for specific errors only. Its logic is taken from Bluebird's catch.

The syntax is

specific( filter | [ filters ], handlerFn )

where the filter (or an array of such) is either an error constructor, a predicate function or an object, and handlerFn is the error handler.

Error constructors are checked with instanceof, predicate functions get the error object and must return true or false, and custom objects are shallowly checked key-by-key for == match. If the predicate function throws, the promise chain will contain this error.

import { specific } from 'already'

somePromise
.catch( specific( MyError, err => { /* handler */ } ) )
.catch( specific( isHttpClientError, err => { /* handler */ } ) )
.catch( specific( { minorIssue: true }, err => { /* handler */ } ) )
.catch( err => { /* any other error, OR if the above error handlers threw */ } )

rethrow

Another catch helper is rethrow which allows a function to be called as an error handler, but ensures it rethrows the upstream error. Note; if the callback function throws an error, or returns a rejected promise, this error will flow through rather than the upstream error.

The callback can either return nothing (synchronously) or an empty promise, which will be awaited before continuing with rethrowing.

The callback will get the error as argument.

import { rethrow } from 'already'

somePromise
.catch( rethrow( err => { /* handler */ } ) )
// the promise is still rejected

or, combined with specific:

import { specific, rethrow } from 'already'

somePromise
.catch( specific( MyError, rethrow( err => { /* handler */ } ) ) )
.catch( err => { /* handler */ } ) // will always be called, if somePromise was rejected

timeout

To race a promise against a timer (to run code within a certain timeframe), use timeout. It is basically a Promise.race() against a delay(), with a nice API. The first argument is the promise to race, and the second is the number of milliseconds to wait for at most.

The promise returned from timeout() will never be rejected. It will be resolved within the timeout period.

The value of the returned promise is an object on the form:

interface TimeoutValue< T >
{
	timedout: boolean;
	reflection?: Reflection< T >; // If the promise did NOT timeout
	promise: Promise< T >;
}

Code can check if timedout is true or false. If it's true, the promise property can be used to further wait for the completion (at least a catch should be registered to handle errors). If timedout is false, the reflection property is of type Reflection and contains the value or error.

import { timeout } from 'already'

const { timedout, reflection, promise } = await timeout( somePromise, 3000 );

if ( timedout )
{
    // The promise timed out
    promise.catch( err =>
        console.error( `Timed out promise eventually failed`, err.stack )
    );
}
else
{
    // The promise was resolved or rejected
    if ( reflection.isResolved )
    {
        doSomething( reflection.value );
    }
    else
    {
        handleError( reflection.error );
    }
}

wrapFunction

In many cases, wrapping a function with custom 'before' and 'after' hooks is useful, e.g. in unit tests. When working with asynchronous code, this may sound easier than it really is, especially in a type safe manner. The 'before' handler, the wrapped function and the 'after' handler can all be either synchronous or asynchronous, and the returned (wrapped) function should reflect this and be synchronous if possible, otherwise asynchronous.

wrapFunction takes a 'before' handler (a function) which is supposed to return an 'after' handler. It returns a new function which takes the target function as argument and performs the invocation by 1) calling the 'before' function, 2) calling the target function and 3) calling the 'after' function (returned by the 'before' function)

import { wrapFunction } from 'already'

const wrapFactory = wrapFunction(
    ( ) =>
    {
        // Do stuff before
        console.log( "before" );
        // ...

        return ( ) =>
        {
            // Do stuff after, e.g. clean up
            console.log( "after" );
        }
    }
);

function aUsefulFunction( )
{
    // Imagine this function to be useful, and we want to wrap it
    console.log( "useful" );
    return "yo";
}

// Call aUsefulFunction but wrap the call
const ret = wrapFactory( aUsefulFunction );

expect( ret ).to.equal( "yo" );

// Console output:
// before
// useful
// after

The before handler can also take an optional argument, which then must be provided when invoking the wrapper.

const wrapFactory = wrapFunction(
    ( hookData: string ) =>
    {
        // Do stuff before
        console.log( hookData );
        // ...

        return ( ) =>
        {
            // Do stuff after, e.g. clean up
            console.log( "after" );
        }
    }
);

function aUsefulFunction( )
{
    // Imagine this function to be useful, and we want to wrap it
    console.log( "useful" );
    return "yo";
}

// Call aUsefulFunction but wrap the call
const ret = wrapFactory( "before", aUsefulFunction );

expect( ret ).to.equal( "yo" );

// Console output:
// before
// useful
// after

And all three functions can be synchronous or asynchronous, e.g.

const wrapFactory = wrapFunction(
    async ( hookData: string ) =>
    {
        // Do stuff before
        console.log( hookData );
        // ...

        return ( ) =>
        {
            // Do stuff after, e.g. clean up
            console.log( "after" );
        }
    }
);

function aUsefulFunction( )
{
    // Imagine this function to be useful, and we want to wrap it
    console.log( "useful" );
    return "yo";
}

// Call aUsefulFunction but wrap the call
const ret = await wrapFactory( "before", aUsefulFunction );

expect( ret ).to.equal( "yo" );

// Console output:
// before
// useful
// after

funnel

Ensuring exclusive calls to a function can be implemented in multiple ways. With asynchrony, this gets quite complicated.

Many problems can be generalized to only running one function at a time (awaiting it if necessary). For this, concurrent is useful. Sometimes a more fine grained control is desired, such as allowing a test and early return as well as signalling that the concurrent logic is complete (to allow the next function call) before the whole function is complete. This results in a more understandable flow.

For this, funnel() is extremely handy.

Consider the following example

async function getConnection( )
{
    const conn = await getReusableConnection( );
    if ( conn ) // We have a re-usable connection or will wait for one to be free
        return conn;

    // We can create (at least) 1 more connection, but maybe only 1
    const newConn = await connect( );
    registerToConnectionPool( newConn ); // This is now re-usable
    return newConn;
}

The above is a connection pool, we might only want a certain number of connections. In this simple example, we can make a counter and check its value, but sometimes the counter isn't static, sometimes asynchronous "questions" must be asked in order to know whether to proceed or not.

Is the above code safe? It isn't. Two synchronously immediate calls to getConnection will likely get the same answer from getReusableConnection, i.e. falsy. This means, they'll both call connect, although maybe just one should have done so. Only one should have created a connection, then registerToConnectionPool while the other should wait until the first is complete, then retry getConnection from scratch to see if a connection can be re-used.

The getConnection could be wrapped inside a concurrent wrapper, but that wouldn't be as performant as possible. Consider two calls to getConnection when there are connections in the pool, but none is free. One of the two calls should create a new connection, but while this takes place (which may take time), another might be freed. This newly freed connection should be re-usable by the second call to getConnection.

funnel makes this trivial. Wrap the getConnection logic in a funnel. Allow concurrent access to getReusableConnection which is concurrency safe. Then create a synchronization barrier (using shouldRetry/retry):

import { funnel } from "already";

const connectionFunnel = funnel< Connection >( );
// Or if pure JavaScript, just:
// const connectionFunnel = funnel( );

async function getConnection( )
{
    return connectionFunnel( async ( shouldRetry, retry ) =>
    {
        const conn = await getReusableConnection( );
        if ( conn ) // We have a re-usable connection or will wait for one to be free
            return conn;

        if ( shouldRetry( ) ) // <-- this and
            return retry( );  // <-- this, is the key

        // We can create (at least) 1 more connection, but maybe only 1
        const newConn = await connect( );
        registerToConnectionPool( newConn ); // This is now re-usable
        return newConn;
    } );
}

When creating a funnel, an options object can be provided with two options:

  • onEmpty [callback]: will be called when the last concurrent task has finished. This can be used for cleanup. Note; This can be called multiple times, it will be called when there is no pending/waiting tasks anymore.
  • concurrency [number]: Specifies how many concurrent tasks to allow before shouldRetry returns true. (Defaults to 1).

The callback function to the funnel can take a third argument after shouldRetry and retry, which is a function called shortcut. This can be used to signal that the function is complete (in terms of synchronization) earlier than when its returned promise is resolved:

import { funnel } from "already";

const onEmpty = ( ) => console.log( "Concurrent tasks finished" );
const connectionFunnel = funnel( { onEmpty } );

async function getConnection( )
{
    return connectionFunnel( async ( shouldRetry, retry, shortcut ) =>
    {
        // Before shouldRetry there is no synchronization, this can be called
        // concurrently.
        const conn = await getReusableConnection( );
        if ( conn )
            return conn;

        if ( shouldRetry( ) )
            return retry( );

        // Synchronization begins
        const newConn = await connect( );
        registerToConnectionPool( newConn );
        // Synchronization ends
        shortcut( ); // This will signal that synchronization is complete,
                     // let concurrent tasks (if any) retry immediately.
        return decorateConnection( newConn ); // Maybe (asynchronously) slow
    } );
}

Keywords

FAQs

Package last updated on 11 Jan 2023

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc