Utilities for JavaScript Promise and AsyncFunction.
Install
npm install --save extra-promise
yarn add extra-promise
API
functions
delay
function delay(timeout: number): Promise<void>
A simple wrapper for setTimeout
.
timeout
function timeout(ms: number): Promise<never>
It throws a TimeoutError
after ms
milliseconds.
try {
result = await Promise.race([
fetchData()
, timeout(5000)
])
} catch (e) {
if (e instanceof TimeoutError) ...
}
pad
function pad<T>(ms: number, fn: () => T | PromiseLike<T>): Promise<T>
Run a function, but wait at least ms
milliseconds before returning.
parallel
function parallel(
tasks: Iterable<() => unknown | PromiseLike<unknown>>
, concurrency: number = Infinity
): Promise<void>
Perform tasks in parallel.
The value range of concurrency
is [1, Infinity].
Invalid values will throw Error
.
parallelAsync
function parallelAsync(
tasks: AsyncIterable<() => unknown | PromiseLike<unknown>>
,
concurrency: number
): Promise<void>
Same as parallel
, but tasks
is an AsyncIterable
.
series
function series(
tasks: Iterable<() => unknown | PromiseLike<unknown>>
| AsyncIterable<() => unknown | PromiseLike<unknown>>
): Promise<void>
Perform tasks in order.
Equivalent to parallel(tasks, 1)
.
waterfall
function waterfall<T>(
tasks: Iterable<(result: unknown) => unknown | PromiseLike<unknown>>
| AsyncIterable<(result: unknown) => unknown | PromiseLike<unknown>>
): Promise<T | undefined>
Perform tasks in order, the return value of the previous task will become the parameter of the next task. If tasks
is empty, return Promise<undefined>
.
each
function each(
iterable: Iterable<T>
, fn: (element: T, i: number) => unknown | PromiseLike<unknown>
, concurrency: number = Infinity
): Promise<void>
The async each
operator for Iterable.
The value range of concurrency
is [1, Infinity].
Invalid values will throw Error
.
eachAsync
function eachAsync<T>(
iterable: AsyncIterable<T>
, fn: (element: T, i: number) => unknown | PromiseLike<unknown>
,
concurrency: number
): Promise<void>
Same as each
, but iterable
is an AsyncIterable
.
map
function map<T, U>(
iterable: Iterable<T>
, fn: (element: T, i: number) => U | PromiseLike<U>
, concurrency: number = Infinity
): Promise<U[]>
The async map
operator for Iterable.
The value range of concurrency
is [1, Infinity].
Invalid values will throw Error
.
mapAsync
export function mapAsync<T, U>(
iterable: AsyncIterable<T>
, fn: (element: T, i: number) => U | PromiseLike<U>
,
concurrency: number
): Promise<U[]>
Same as map
, but iterable
is an AsyncIterable
.
filter
function filter<T, U = T>(
iterable: Iterable<T>
, fn: (element: T, i: number) => boolean | PromiseLike<boolean>
, concurrency: number = Infinity
): Promise<U[]>
The async filter
operator for Iterable.
The value range of concurrency
is [1, Infinity].
Invalid values will throw Error
.
filterAsync
function filterAsync<T, U = T>(
iterable: AsyncIterable<T>
, fn: (element: T, i: number) => boolean | PromiseLike<boolean>
,
concurrency: number
): Promise<U[]>
Same as filter
, but iterable
is an AsyncIterable
.
all
function all<T extends { [key: string]: PromiseLike<unknown> }>(
obj: T
): Promise<{ [Key in keyof T]: UnpackedPromiseLike<T[Key]> }>
It is similar to Promise.all
, but the first parameter is an object.
Example:
const { task1, task2 } = await all({
task1: invokeTask1()
, task2: invokeTask2()
})
promisify
type Callback<T> = (err: any, result?: T) => void
function promisify<Result, Args extends any[] = unknown[]>(
fn: (...args: [...args: Args, callback?: Callback<Result>]) => unknown
): (...args: Args) => Promise<Result>
The well-known promisify
function.
callbackify
type Callback<T> = (err: any, result?: T) => void
function callbackify<Result, Args extends any[] = unknown[]>(
fn: (...args: Args) => Awaitable<Result>
): (...args: [...args: Args, callback: Callback<Result>]) => void
The callbackify
function, as opposed to promisify
.
asyncify
function asyncify<T extends any[], U>(
fn: (...args: T) => U | PromiseLike<U>
): (...args: Promisify<T>) => Promise<U>
Turn sync functions into async functions.
const a = 1
const b = Promise.resolve(2)
const add = (a: number, b: number) => a + b
add(a, await b)
const addAsync = asyncify(add)
await addAsync(a, b)
function toExtraPromise<T>(promise: PromiseLike<T>): ExtraPromise<T>
spawn
function spawn(num: number, task: (id: number) => Promise<void>): Promise<void>
A sugar for running the same task in parallel.
The parameter id
is from 1
to num
.
queueConcurrency
function queueConcurrency<T, Args extends any[]>(
concurrency: number
, fn: (...args: Args) => PromiseLike<T>
): (...args: Args) => Promise<T>
Limit the number of concurrency, calls that exceed the number of concurrency will be delayed in order.
throttleConcurrency
function throttleConcurrency<T, Args extends any[]>(
concurrency: number
, fn: (...args: Args) => PromiseLike<T>
): (...args: Args) => Promise<T> | undefined
Limit the number of concurrency, calls that exceed the number of concurrency will not occur and return undefined
.
throttleUntilDone
function throttleUntilDone<T>(fn: () => PromiseLike<T>): () => Promise<T>
Limit the number of concurrent to 1, calls that exceed the number of concurrency will return the same Promise
of the currently executing call.
reusePendingPromise
type VerboseResult<T> = [value: T, isReuse: boolean]
interface IReusePendingPromiseOptions {
verbose?: true
}
function reusePendingPromise<T, Args extends any[]>(
fn: (...args: Args) => PromiseLike<T>
, options: IReusePendingPromiseOptions & { verbose: true }
): (...args: Args) => Promise<VerboseResult<T>>
function reusePendingPromise<T, Args extends any[]>(
fn: (...args: Args) => PromiseLike<T>
, options: IReusePendingPromiseOptions & { verbose: false }
): (...args: Args) => Promise<T>
function reusePendingPromise<T, Args extends any[]>(
fn: (...args: Args) => PromiseLike<T>
, options: Omit<IReusePendingPromiseOptions, 'verbose'>
): (...args: Args) => Promise<T>
function reusePendingPromise<T, Args extends any[]>(
fn: (...args: Args) => PromiseLike<T>
): (...args: Args) => Promise<T>
Returns a function that will return the same Promise
for calls with the same parameters if the Promise
is pending.
Classes
enum ExtraPromiseState {
Pending = 'pending'
, Fulfilled = 'fulfilled'
, Rejected = 'rejected'
}
class ExtraPromise<T> extends Promise<T> {
get pending(): boolean
get fulfilled(): boolean
get rejected(): boolean
get state(): ExtraPromiseState
constructor(executor: (resolve: (value: T) => void, reject: (reason: any) => void) => void)
}
A subclass of Promise
.
ExtraPromise
has 3 readonly properties: pending
, fulfilled
, and rejected
.
So the state of the Promise
can be known without calling the then
method.
Channel
class Channel<T> {
send(value: T): Promise<void>
receive(): AsyncIterable<T>
close: () => void
}
Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with Promise
and AsyncIterable
.
send
Send value to the channel, block until data is taken out by the consumer.receive
Receive value from the channel.close
Close the channel, no more values can be sent.
If the channel closed, send
will throw ChannelClosedError
.
const chan = new Channel<string>()
queueMicrotask(() => {
await chan.send('hello')
await chan.send('world')
chan.close()
})
for await (const value of chan.receive()) {
console.log(value)
}
BufferedChannel
class BufferedChannel {
send(value: T): Promise<void>
receive(): AsyncIterable<T>
close: () => void
}
Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with Promise
and AsyncIterable
.
When the amount of data sent exceeds bufferSize
, send
will block until data in buffer is taken out by the consumer.
send
Send value to the channel.
If the buffer is full, block.receive
Receive value from the channel.close
Close channel, no more values can be sent.
If the channel closed, send
will throw ChannelClosedError
.
const chan = new BufferedChannel<string>(1)
queueMicrotask(() => {
await chan.send('hello')
await chan.send('world')
chan.close()
})
for await (const value of chan.receive()) {
console.log(value)
}
UnlimitedChannel
class UnlimitedChannel {
send(value: T): void
receive(): AsyncIterable<T>
close: () => void
}
Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with Promise
and AsyncIterable
.
UnlimitedChannel
return a tuple includes three channel functions:
send
Send value to the channel.
There is no size limit on the buffer, all sending will return immediately.receive
Receive value from the channel.close
close the channel, no more values can be sent.
If the channel closed, send
will throw ChannelClosedError
.
const chan = new UnlimitedChannel<string>()
queueMicrotask(() => {
chan.send('hello')
chan.send('world')
chan.close()
})
for await (const value of chan.receive()) {
console.log(value)
}
Deferred
class Deferred<T> implements PromiseLike<T> {
then: PromiseLike<T>['then']
resolve(value: T): void
reject(reason: unknown): void
}
Deferred
is a Promise
that separates resolve()
and reject()
from the constructor.
MutableDeferred
class MutableDeferred<T> implements PromiseLike<T> {
then: PromiseLike<T>['then']
resolve(value: T): void
reject(reason: unknown): void
}
MutableDeferred
is similar to Deferred
,
but its resolve()
and reject()
can be called multiple times to change the value.
const deferred = new MutableDeferred()
deferred.resolve(1)
deferred.resolve(2)
await deferred
ReusableDeferred
class ReusableDeferred<T> implements PromiseLike<T> {
then: PromiseLike<T>['then']
resolve(value: T): void
reject(reason: unknown): void
}
ReusableDeferred
is similar to MutableDeferred
,
but its internal Deferred
will be overwritten with a new pending Deferred
after each call.
const deferred = new ReusableDeferred()
deferred.resolve(1)
queueMicrotask(() => deferred.resolve(2))
await deferred
LazyPromise
class LazyPromise<T> implements PromiseLike<T> {
then: PromiseLike<T>['then']
constructor(executor: (resolve: (value: T) => void, reject: (reason: any) => void) => void)
}
LazyPromise
constructor is the same as Promise
.
The difference with Promise
is that LazyPromise
only performs executor
after then
method is called.
Signal
class Signal implements PromiseLike<void> {
then: PromiseLike<void>['then']
emit(): void
discard(): void
}
A one-time signal.
The emit()
make the internal Promise resolve.
The discard()
make the internal Promise reject SignalDiscarded
.
SignalGroup
class SignalGroup {
add(signal: Signal): void
remove(signal: Signal): void
emitAll(): void
discardAll(): void
}
Semaphore
type Release = () => void
class Semaphore {
constructor(count: number)
acquire(): Promise<Release>
acquire<T>(handler: () => T | PromiseLike<T>): Promise<T>
}
Mutex
type Release = () => void
class Mutex extends Semaphore {
acquire(): Promise<Release>
acquire<T>(handler: () => T | PromiseLike<T>): Promise<T>
}
DebounceMicrotask
class DebounceMicrotask {
queue(fn: () => void): void
cancel(fn: () => void): boolean
}
queue
can create microtasks,
if the microtask is not executed, multiple calls will only queue it once.
cancel
can cancel microtasks before it is executed.
TaskRunner
type Task<T> = () => PromiseLike<T>
class TaskRunner {
constructor(concurrency: number = Infinity)
getConcurrency(): number
setConcurrency(concurrency: number): void
add(task: Task<T>): Promise<T>
clear(): void
start(): void
stop(): void
}
A task runner, it will execute tasks in FIFO order.