already
Advanced tools
Comparing version 2.0.0 to 2.1.0
@@ -57,2 +57,15 @@ declare const _default: { | ||
export declare type EnsureNotPromise<T> = T extends Promise<infer _U> ? never : T; | ||
export declare type Callback<R, A extends any[]> = (...args: A) => Promise<R>; | ||
/** | ||
* Create a maximum concurrency for fn (can be curried) | ||
* | ||
* Either specify fn and invoke the returned function, or skip fn and the | ||
* returned function will take an arbitrary function to limit concurrency for. | ||
* | ||
* @param size Concurrency limit | ||
* @param fn The function to limit the concurrency for | ||
* @returns Concurrency-limited version of fn | ||
*/ | ||
export declare function concurrent<R, A extends any[]>(size: number, fn: Callback<R, A>): (...args: Parameters<typeof fn>) => Promise<ReturnType<typeof fn>>; | ||
export declare function concurrent(size: number): <R, A extends any[]>(fn: Callback<R, A>, ...a: A) => Promise<R>; | ||
export declare function delay(milliseconds: number): Promise<void>; | ||
@@ -147,4 +160,2 @@ export declare function delay<T>(milliseconds: number, t: T): Promise<T>; | ||
export declare function deferInspectable(v: void): EmptyDeferredInspectable; | ||
export declare function Try<T, U extends Promise<T>>(cb: () => U): Promise<T>; | ||
export declare function Try<T>(cb: () => T): Promise<T>; | ||
export declare type ErrorFilterFunction = (err: Error) => boolean; | ||
@@ -151,0 +162,0 @@ export interface ErrorFilterObject { |
@@ -1,2 +0,1 @@ | ||
import throat from "throat"; | ||
export default { | ||
@@ -29,2 +28,42 @@ defer, | ||
} | ||
export function concurrent(size, fn) { | ||
const queue = makeQueue(size); | ||
if (size < 1) | ||
throw new RangeError(`Size must be at least 1`); | ||
if (!fn) | ||
return (cb, ...args) => queue.enqueue(() => cb(...args)); | ||
else | ||
return (...args) => queue.enqueue(() => fn(...args)); | ||
} | ||
function makeQueue(size) { | ||
const queue = { | ||
size, | ||
count: 0, | ||
queue: [], | ||
process: () => { | ||
if (queue.queue.length) { | ||
const first = queue.queue.shift(); | ||
const { cb, deferred } = first; | ||
queue.runOne(cb).then(deferred.resolve, deferred.reject); | ||
} | ||
}, | ||
runOne: (cb) => { | ||
++queue.count; | ||
return (async () => cb())() | ||
.finally(() => { | ||
--queue.count; | ||
queue.process(); | ||
}); | ||
}, | ||
enqueue: async (cb) => { | ||
if (queue.count >= queue.size) { | ||
const deferred = defer(); | ||
queue.queue.push({ cb, deferred }); | ||
return deferred.promise; | ||
} | ||
return queue.runOne(cb); | ||
} | ||
}; | ||
return queue; | ||
} | ||
export function delay(milliseconds, t) { | ||
@@ -91,7 +130,7 @@ return new Promise(resolve => { | ||
const promiseMapFn = (t, index, arr) => Promise.resolve(mapFn(t, index, arr)); | ||
const throated = throat(concurrency); | ||
const concurrently = concurrent(concurrency); | ||
return (t) => { | ||
return Promise.resolve(t) | ||
.then((values) => toReadonlyArray(values).map((val, index, arr) => throated(() => Promise.resolve(val)) | ||
.then((val) => throated(() => promiseMapFn(val, index, arr))))) | ||
.then((values) => toReadonlyArray(values).map((val, index, arr) => (() => Promise.resolve(val))() | ||
.then((val) => concurrently(promiseMapFn, val, index, arr)))) | ||
.then(values => Promise.all(values)); | ||
@@ -228,2 +267,3 @@ }; | ||
export function defer() { | ||
var _a; | ||
const deferred = {}; | ||
@@ -234,2 +274,10 @@ deferred.promise = new Promise((resolve, reject) => { | ||
}); | ||
/* istanbul ignore next */ | ||
if (((_a = process === null || process === void 0 ? void 0 : process.env) === null || _a === void 0 ? void 0 : _a.JEST_WORKER_ID) !== undefined) | ||
try { | ||
// Jest has decided for many versions to break async catching, | ||
// so this is needed for unit tests not to break unnecessarily. | ||
deferred.promise.catch(() => { }); | ||
} | ||
catch (_err) { } | ||
return deferred; | ||
@@ -301,5 +349,2 @@ } | ||
} | ||
export async function Try(cb) { | ||
return cb(); | ||
} | ||
// This logic is taken from Bluebird | ||
@@ -493,3 +538,3 @@ function catchFilter(filters, err) { | ||
const runner = () => { | ||
return Try(() => fn(shouldRetry, retry, shortcut)) | ||
return (async () => fn(shouldRetry, retry, shortcut))() | ||
.finally(shortcut); | ||
@@ -496,0 +541,0 @@ }; |
@@ -57,2 +57,15 @@ declare const _default: { | ||
export declare type EnsureNotPromise<T> = T extends Promise<infer _U> ? never : T; | ||
export declare type Callback<R, A extends any[]> = (...args: A) => Promise<R>; | ||
/** | ||
* Create a maximum concurrency for fn (can be curried) | ||
* | ||
* Either specify fn and invoke the returned function, or skip fn and the | ||
* returned function will take an arbitrary function to limit concurrency for. | ||
* | ||
* @param size Concurrency limit | ||
* @param fn The function to limit the concurrency for | ||
* @returns Concurrency-limited version of fn | ||
*/ | ||
export declare function concurrent<R, A extends any[]>(size: number, fn: Callback<R, A>): (...args: Parameters<typeof fn>) => Promise<ReturnType<typeof fn>>; | ||
export declare function concurrent(size: number): <R, A extends any[]>(fn: Callback<R, A>, ...a: A) => Promise<R>; | ||
export declare function delay(milliseconds: number): Promise<void>; | ||
@@ -147,4 +160,2 @@ export declare function delay<T>(milliseconds: number, t: T): Promise<T>; | ||
export declare function deferInspectable(v: void): EmptyDeferredInspectable; | ||
export declare function Try<T, U extends Promise<T>>(cb: () => U): Promise<T>; | ||
export declare function Try<T>(cb: () => T): Promise<T>; | ||
export declare type ErrorFilterFunction = (err: Error) => boolean; | ||
@@ -151,0 +162,0 @@ export interface ErrorFilterObject { |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.deferSet = exports.OrderedAsynchrony = exports.funnel = exports.wrapFunction = exports.rethrow = exports.specific = exports.Try = exports.deferInspectable = exports.inspect = exports.reflect = exports.defer = exports.retry = exports.once = exports.some = exports.eachImpl = exports.each = exports.reduce = exports.map = exports.filter = exports.props = exports.tap = exports.delayChain = exports.delay = void 0; | ||
const throat_1 = require("throat"); | ||
exports.deferSet = exports.OrderedAsynchrony = exports.funnel = exports.wrapFunction = exports.rethrow = exports.specific = exports.deferInspectable = exports.inspect = exports.reflect = exports.defer = exports.retry = exports.once = exports.some = exports.eachImpl = exports.each = exports.reduce = exports.map = exports.filter = exports.props = exports.tap = exports.delayChain = exports.delay = exports.concurrent = void 0; | ||
exports.default = { | ||
@@ -32,2 +31,43 @@ defer, | ||
} | ||
function concurrent(size, fn) { | ||
const queue = makeQueue(size); | ||
if (size < 1) | ||
throw new RangeError(`Size must be at least 1`); | ||
if (!fn) | ||
return (cb, ...args) => queue.enqueue(() => cb(...args)); | ||
else | ||
return (...args) => queue.enqueue(() => fn(...args)); | ||
} | ||
exports.concurrent = concurrent; | ||
function makeQueue(size) { | ||
const queue = { | ||
size, | ||
count: 0, | ||
queue: [], | ||
process: () => { | ||
if (queue.queue.length) { | ||
const first = queue.queue.shift(); | ||
const { cb, deferred } = first; | ||
queue.runOne(cb).then(deferred.resolve, deferred.reject); | ||
} | ||
}, | ||
runOne: (cb) => { | ||
++queue.count; | ||
return (async () => cb())() | ||
.finally(() => { | ||
--queue.count; | ||
queue.process(); | ||
}); | ||
}, | ||
enqueue: async (cb) => { | ||
if (queue.count >= queue.size) { | ||
const deferred = defer(); | ||
queue.queue.push({ cb, deferred }); | ||
return deferred.promise; | ||
} | ||
return queue.runOne(cb); | ||
} | ||
}; | ||
return queue; | ||
} | ||
function delay(milliseconds, t) { | ||
@@ -99,7 +139,7 @@ return new Promise(resolve => { | ||
const promiseMapFn = (t, index, arr) => Promise.resolve(mapFn(t, index, arr)); | ||
const throated = throat_1.default(concurrency); | ||
const concurrently = concurrent(concurrency); | ||
return (t) => { | ||
return Promise.resolve(t) | ||
.then((values) => toReadonlyArray(values).map((val, index, arr) => throated(() => Promise.resolve(val)) | ||
.then((val) => throated(() => promiseMapFn(val, index, arr))))) | ||
.then((values) => toReadonlyArray(values).map((val, index, arr) => (() => Promise.resolve(val))() | ||
.then((val) => concurrently(promiseMapFn, val, index, arr)))) | ||
.then(values => Promise.all(values)); | ||
@@ -243,2 +283,3 @@ }; | ||
function defer() { | ||
var _a; | ||
const deferred = {}; | ||
@@ -249,2 +290,10 @@ deferred.promise = new Promise((resolve, reject) => { | ||
}); | ||
/* istanbul ignore next */ | ||
if (((_a = process === null || process === void 0 ? void 0 : process.env) === null || _a === void 0 ? void 0 : _a.JEST_WORKER_ID) !== undefined) | ||
try { | ||
// Jest has decided for many versions to break async catching, | ||
// so this is needed for unit tests not to break unnecessarily. | ||
deferred.promise.catch(() => { }); | ||
} | ||
catch (_err) { } | ||
return deferred; | ||
@@ -320,6 +369,2 @@ } | ||
exports.deferInspectable = deferInspectable; | ||
async function Try(cb) { | ||
return cb(); | ||
} | ||
exports.Try = Try; | ||
// This logic is taken from Bluebird | ||
@@ -516,3 +561,3 @@ function catchFilter(filters, err) { | ||
const runner = () => { | ||
return Try(() => fn(shouldRetry, retry, shortcut)) | ||
return (async () => fn(shouldRetry, retry, shortcut))() | ||
.finally(shortcut); | ||
@@ -519,0 +564,0 @@ }; |
@@ -5,3 +5,3 @@ { | ||
"license": "MIT", | ||
"version": "2.0.0", | ||
"version": "2.1.0", | ||
"author": "Gustaf Räntilä <g.rantila@gmail.com>", | ||
@@ -16,4 +16,4 @@ "repository": { | ||
], | ||
"main": "./dist/index.js", | ||
"types": "./dist/index.d.ts", | ||
"main": "./dist/index.js", | ||
"exports": { | ||
@@ -31,3 +31,3 @@ "import": "./dist-mjs/index.js", | ||
"build": "yarn build:lib && yarn build:test", | ||
"lint": "node_modules/.bin/tslint --project .", | ||
"lint": "true", | ||
"jest": "node_modules/.bin/jest --coverage", | ||
@@ -47,3 +47,2 @@ "test": "yarn lint && yarn jest", | ||
"filter", | ||
"finally", | ||
"props", | ||
@@ -64,3 +63,3 @@ "tap", | ||
"commitizen": "^4.2.2", | ||
"concurrently": "^5.3.0", | ||
"concurrently": "^6.2.0", | ||
"cz-conventional-changelog": "^3.3.0", | ||
@@ -70,10 +69,8 @@ "jest": "^26.6.3", | ||
"rimraf": "^3.0.2", | ||
"trace-unhandled": "^1.0.1", | ||
"trace-unhandled": "^2.0.1", | ||
"ts-jest": "^26.4.4", | ||
"tslint": "6.1.3", | ||
"typescript": "4.1.2" | ||
"typescript": "4.3.4" | ||
}, | ||
"dependencies": { | ||
"throat": "^5.0.0" | ||
}, | ||
"dependencies": {}, | ||
"config": { | ||
@@ -80,0 +77,0 @@ "commitizen": { |
@@ -12,7 +12,6 @@ [![npm version][npm-image]][npm-url] | ||
The functions are standalone and depends on no particular Promise implementation and therefore works well for Javascript's built-in Promise. | ||
The functions are standalone and depends on no particular Promise implementation and therefore works well for JavaScript's built-in Promise. | ||
This library is written in TypeScript but is exposed as ES7 (if imported as `already`) and ES5 (if imported as `already/es5`). Typings are provided too, so any TypeScript project using this library will automatically get full type safety of all the functions. | ||
The library is written in TypeScript, so typings are provided. Apart from being exported as JavaScript (ES2019), it's also exported as an *ES module*, if imported in platforms (and bundlers) supporting this. | ||
The library is also exported as an *ES module*, if imported in platforms (and bundlers) supporting this. | ||
@@ -34,2 +33,4 @@ # Versions | ||
* [concurrent](#concurrent) | ||
<br> Run a function with certain concurrency | ||
* [delay](#delay) | ||
@@ -117,2 +118,38 @@ <br> Create a promise which resolved after a certain time | ||
## concurrent | ||
Since version 2 of this package, the dependency on `throat` was removed. This function works like throat; it wraps a function with concurrency, returning a new function that can be called repeatedly, but will only call the underlying function with the provided concurrency. | ||
The function takes a concurrency option, and optionally the function to be wrapped. If the second argument isn't passed, the returned function takes a function as first argument. This allows you to run separate functions, yet guarantee a maximum concurrency. | ||
```ts | ||
import { concurrent } from 'already' | ||
// function readSomethingFromDb(): Promise<any>; | ||
const concurrently = concurrent( 3, readSomethingFromDb ); | ||
// This will ensure <readSomethingFromDb> isn't called more than 3 times concurrently | ||
const results = await Promise.all( | ||
listOfIds.map( id => concurrently( id ) ) | ||
); | ||
``` | ||
or without specifying the function, so that different functions can share concurrency: | ||
```ts | ||
import { concurrent } from 'already' | ||
const concurrently = concurrent( 3 ); | ||
const results = await Promise.all( | ||
listOfThings.map( thing => | ||
typeof thing === 'string' | ||
? concurrently( readSomethingElse, thing ) | ||
: concurrently( readSomethingFromDb, thing ) | ||
) | ||
); | ||
``` | ||
## delay | ||
@@ -119,0 +156,0 @@ |
97690
0
1619
891
- Removedthroat@^5.0.0
- Removedthroat@5.0.0(transitive)