@matrixai/async-locks
Advanced tools
Comparing version 3.2.0 to 4.0.0
import type { ResourceRelease } from '@matrixai/resources'; | ||
import type { ContextTimedInput } from './types'; | ||
import { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
import Lock from './Lock'; | ||
declare class Barrier { | ||
protected lock: Lock; | ||
protected count: number; | ||
protected _count: number; | ||
protected release: ResourceRelease; | ||
static createBarrier(count: number): Promise<Barrier>; | ||
protected constructor(count: number, lock: Lock, release: ResourceRelease); | ||
wait(timeout?: number): Promise<void>; | ||
get count(): number; | ||
destroy(): Promise<void>; | ||
wait(ctx?: Partial<ContextTimedInput>): PromiseCancellable<void>; | ||
} | ||
export default Barrier; |
@@ -6,28 +6,37 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const async_cancellable_1 = require("@matrixai/async-cancellable"); | ||
const Lock_1 = __importDefault(require("./Lock")); | ||
const errors_1 = require("./errors"); | ||
class Barrier { | ||
lock; | ||
_count; | ||
release; | ||
static async createBarrier(count) { | ||
const lock = new Lock_1.default(); | ||
const [release] = await lock.lock()(); | ||
return new this(count, lock, release); | ||
} | ||
constructor(count, lock, release) { | ||
if (count < 0) { | ||
throw new errors_1.ErrorAsyncLocksBarrierCount(); | ||
throw new RangeError('Barrier must be constructed with `count` >= than 0'); | ||
} | ||
this.lock = lock; | ||
this.release = release; | ||
this.count = count; | ||
this._count = count; | ||
} | ||
static async createBarrier(count) { | ||
const lock = new Lock_1.default(); | ||
const [release] = await lock.lock()(); | ||
return new this(count, lock, release); | ||
get count() { | ||
return this._count; | ||
} | ||
async wait(timeout) { | ||
async destroy() { | ||
await this.release(); | ||
} | ||
wait(ctx) { | ||
if (!this.lock.isLocked()) { | ||
return; | ||
return async_cancellable_1.PromiseCancellable.resolve(); | ||
} | ||
this.count = Math.max(this.count - 1, 0); | ||
if (this.count === 0) { | ||
await this.release(); | ||
this._count = Math.max(this._count - 1, 0); | ||
if (this._count === 0) { | ||
return async_cancellable_1.PromiseCancellable.from(this.release()); | ||
} | ||
else { | ||
await this.lock.waitForUnlock(timeout); | ||
return this.lock.waitForUnlock(ctx); | ||
} | ||
@@ -34,0 +43,0 @@ } |
@@ -11,8 +11,15 @@ import { AbstractError } from '@matrixai/errors'; | ||
} | ||
declare class ErrorAsyncLocksBarrierCount<T> extends ErrorAsyncLocks<T> { | ||
/** | ||
* If you get this exception, this means within the same `Monitor` instance, | ||
* you tried to lock a read on a key that is already locked for write, or | ||
* you tried to lock a write on a key that is already locked for read. This | ||
* is not supported because to do so would imply a lock upgrade from read to | ||
* write or from write to read. | ||
*/ | ||
declare class ErrorAsyncLocksMonitorLockType<T> extends ErrorAsyncLocks<T> { | ||
static description: string; | ||
} | ||
declare class ErrorAsyncLocksSemaphoreLimit<T> extends ErrorAsyncLocks<T> { | ||
declare class ErrorAsyncLocksMonitorDeadlock<T> extends ErrorAsyncLocks<T> { | ||
static description: string; | ||
} | ||
export { ErrorAsyncLocks, ErrorAsyncLocksTimeout, ErrorAsyncLocksLockBoxConflict, ErrorAsyncLocksBarrierCount, ErrorAsyncLocksSemaphoreLimit, }; | ||
export { ErrorAsyncLocks, ErrorAsyncLocksTimeout, ErrorAsyncLocksLockBoxConflict, ErrorAsyncLocksMonitorLockType, ErrorAsyncLocksMonitorDeadlock, }; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.ErrorAsyncLocksSemaphoreLimit = exports.ErrorAsyncLocksBarrierCount = exports.ErrorAsyncLocksLockBoxConflict = exports.ErrorAsyncLocksTimeout = exports.ErrorAsyncLocks = void 0; | ||
exports.ErrorAsyncLocksMonitorDeadlock = exports.ErrorAsyncLocksMonitorLockType = exports.ErrorAsyncLocksLockBoxConflict = exports.ErrorAsyncLocksTimeout = exports.ErrorAsyncLocks = void 0; | ||
const errors_1 = require("@matrixai/errors"); | ||
class ErrorAsyncLocks extends errors_1.AbstractError { | ||
static description = 'Async locks error'; | ||
} | ||
exports.ErrorAsyncLocks = ErrorAsyncLocks; | ||
ErrorAsyncLocks.description = 'Async locks error'; | ||
class ErrorAsyncLocksTimeout extends ErrorAsyncLocks { | ||
static description = 'Async locks timeout'; | ||
} | ||
exports.ErrorAsyncLocksTimeout = ErrorAsyncLocksTimeout; | ||
ErrorAsyncLocksTimeout.description = 'Async locks timeout'; | ||
class ErrorAsyncLocksLockBoxConflict extends ErrorAsyncLocks { | ||
static description = 'LockBox cannot lock same ID with different Lockable classes'; | ||
} | ||
exports.ErrorAsyncLocksLockBoxConflict = ErrorAsyncLocksLockBoxConflict; | ||
ErrorAsyncLocksLockBoxConflict.description = 'LockBox cannot lock same ID with different Lockable classes'; | ||
class ErrorAsyncLocksBarrierCount extends ErrorAsyncLocks { | ||
/** | ||
* If you get this exception, this means within the same `Monitor` instance, | ||
* you tried to lock a read on a key that is already locked for write, or | ||
* you tried to lock a write on a key that is already locked for read. This | ||
* is not supported because to do so would imply a lock upgrade from read to | ||
* write or from write to read. | ||
*/ | ||
class ErrorAsyncLocksMonitorLockType extends ErrorAsyncLocks { | ||
static description = 'Monitor does not support upgrading or downgrading the lock type'; | ||
} | ||
exports.ErrorAsyncLocksBarrierCount = ErrorAsyncLocksBarrierCount; | ||
ErrorAsyncLocksBarrierCount.description = 'Barrier must be created with a count >= 0'; | ||
class ErrorAsyncLocksSemaphoreLimit extends ErrorAsyncLocks { | ||
exports.ErrorAsyncLocksMonitorLockType = ErrorAsyncLocksMonitorLockType; | ||
class ErrorAsyncLocksMonitorDeadlock extends ErrorAsyncLocks { | ||
static description = 'Monitor has met a potential deadlock'; | ||
} | ||
exports.ErrorAsyncLocksSemaphoreLimit = ErrorAsyncLocksSemaphoreLimit; | ||
ErrorAsyncLocksSemaphoreLimit.description = 'Semaphore must be created with a limit >= 1'; | ||
exports.ErrorAsyncLocksMonitorDeadlock = ErrorAsyncLocksMonitorDeadlock; | ||
//# sourceMappingURL=errors.js.map |
@@ -7,4 +7,5 @@ export { default as Lock } from './Lock'; | ||
export { default as Semaphore } from './Semaphore'; | ||
export { default as Monitor } from './Monitor'; | ||
export * as utils from './utils'; | ||
export * as errors from './errors'; | ||
export * from './types'; |
@@ -32,3 +32,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.errors = exports.utils = exports.Semaphore = exports.Barrier = exports.LockBox = exports.RWLockWriter = exports.RWLockReader = exports.Lock = void 0; | ||
exports.errors = exports.utils = exports.Monitor = exports.Semaphore = exports.Barrier = exports.LockBox = exports.RWLockWriter = exports.RWLockReader = exports.Lock = void 0; | ||
var Lock_1 = require("./Lock"); | ||
@@ -46,2 +46,4 @@ Object.defineProperty(exports, "Lock", { enumerable: true, get: function () { return __importDefault(Lock_1).default; } }); | ||
Object.defineProperty(exports, "Semaphore", { enumerable: true, get: function () { return __importDefault(Semaphore_1).default; } }); | ||
var Monitor_1 = require("./Monitor"); | ||
Object.defineProperty(exports, "Monitor", { enumerable: true, get: function () { return __importDefault(Monitor_1).default; } }); | ||
exports.utils = __importStar(require("./utils")); | ||
@@ -48,0 +50,0 @@ exports.errors = __importStar(require("./errors")); |
@@ -1,14 +0,16 @@ | ||
import type { ResourceAcquire } from '@matrixai/resources'; | ||
import type { Lockable } from './types'; | ||
import { Mutex } from 'async-mutex'; | ||
import type { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
import type { ResourceAcquireCancellable, Lockable, ContextTimedInput } from './types'; | ||
import Semaphore from './Semaphore'; | ||
declare class Lock implements Lockable { | ||
protected _lock: Mutex; | ||
protected _count: number; | ||
lock(timeout?: number): ResourceAcquire<Lock>; | ||
protected semaphore: Semaphore; | ||
get count(): number; | ||
isLocked(): boolean; | ||
waitForUnlock(timeout?: number): Promise<void>; | ||
withF<T>(...params: [...([timeout: number] | []), (lock: Lock) => Promise<T>]): Promise<T>; | ||
lock(ctx?: Partial<ContextTimedInput>): ResourceAcquireCancellable<Lock>; | ||
waitForUnlock(ctx?: Partial<ContextTimedInput>): PromiseCancellable<void>; | ||
withF<T>(...params: [ | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: Lock) => Promise<T> | ||
]): Promise<T>; | ||
withG<T, TReturn, TNext>(...params: [ | ||
...([timeout: number] | []), | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: Lock) => AsyncGenerator<T, TReturn, TNext> | ||
@@ -15,0 +17,0 @@ ]): AsyncGenerator<T, TReturn, TNext>; |
"use strict"; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const async_mutex_1 = require("async-mutex"); | ||
const resources_1 = require("@matrixai/resources"); | ||
const utils_1 = require("./utils"); | ||
const errors_1 = require("./errors"); | ||
const Semaphore_1 = __importDefault(require("./Semaphore")); | ||
class Lock { | ||
constructor() { | ||
this._lock = new async_mutex_1.Mutex(); | ||
this._count = 0; | ||
} | ||
lock(timeout) { | ||
return async () => { | ||
++this._count; | ||
let lock = this._lock; | ||
if (timeout != null) { | ||
lock = (0, async_mutex_1.withTimeout)(this._lock, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
let release; | ||
try { | ||
release = await lock.acquire(); | ||
} | ||
catch (e) { | ||
--this._count; | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
--this._count; | ||
release(); | ||
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54 | ||
await (0, utils_1.yieldMicro)(); | ||
}, | ||
this, | ||
]; | ||
}; | ||
} | ||
semaphore = new Semaphore_1.default(1); | ||
get count() { | ||
return this._count; | ||
return this.semaphore.count; | ||
} | ||
isLocked() { | ||
return this._lock.isLocked(); | ||
return this.semaphore.isLocked(); | ||
} | ||
async waitForUnlock(timeout) { | ||
if (timeout != null) { | ||
let timedOut = false; | ||
await Promise.race([ | ||
this._lock.waitForUnlock(), | ||
(0, utils_1.sleep)(timeout).then(() => { | ||
timedOut = true; | ||
}), | ||
]); | ||
if (timedOut) { | ||
throw new errors_1.ErrorAsyncLocksTimeout(); | ||
} | ||
} | ||
else { | ||
await this._lock.waitForUnlock(); | ||
} | ||
lock(ctx) { | ||
const acquire = this.semaphore.lock(1, ctx); | ||
return () => { | ||
const acquireP = acquire(); | ||
return acquireP.then(([release]) => [release, this], undefined, (signal) => { | ||
// Propagate cancellation to `acquireP` | ||
signal.addEventListener('abort', () => { | ||
acquireP.cancel(signal.reason); | ||
}, { once: true }); | ||
}); | ||
}; | ||
} | ||
async withF(...params) { | ||
waitForUnlock(ctx) { | ||
return this.semaphore.waitForUnlock(1, ctx); | ||
} | ||
withF(...params) { | ||
const f = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withF)([this.lock(timeout)], ([lock]) => f(lock)); | ||
return (0, resources_1.withF)([this.lock(...params)], ([lock]) => f(lock)); | ||
} | ||
withG(...params) { | ||
const g = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withG)([this.lock(timeout)], ([lock]) => g(lock)); | ||
return (0, resources_1.withG)([this.lock(...params)], ([lock]) => g(lock)); | ||
} | ||
@@ -75,0 +39,0 @@ } |
@@ -1,28 +0,28 @@ | ||
import type { ResourceAcquire } from '@matrixai/resources'; | ||
import type { ToString, Lockable, MultiLockRequest, MultiLockAcquire, MultiLockAcquired } from './types'; | ||
import type { ResourceAcquireCancellable, Lockable, LockRequest, LockAcquireCancellable, LockAcquired, ContextTimedInput } from './types'; | ||
import { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
declare class LockBox<L extends Lockable = Lockable> implements Lockable { | ||
protected _locks: Map<string, L>; | ||
lock(...requests: Array<MultiLockRequest<L>>): ResourceAcquire<LockBox<L>>; | ||
lockMulti(...requests: Array<MultiLockRequest<L>>): Array<MultiLockAcquire<L>>; | ||
get locks(): ReadonlyMap<string, L>; | ||
get count(): number; | ||
isLocked(key?: ToString, ...params: Parameters<L['isLocked']>): boolean; | ||
waitForUnlock(timeout?: number, key?: ToString): Promise<void>; | ||
isLocked(key?: string, ...params: Parameters<L['isLocked']>): boolean; | ||
lock(...params: [...requests: Array<LockRequest<L>>, ctx: Partial<ContextTimedInput>] | [...requests: Array<LockRequest<L>>] | [ctx?: Partial<ContextTimedInput>]): ResourceAcquireCancellable<LockBox<L>>; | ||
lockMulti(...requests: Array<LockRequest<L>>): Array<LockAcquireCancellable<L>>; | ||
waitForUnlock(...params: [key?: string, ctx?: Partial<ContextTimedInput>] | [key?: string] | [ctx?: Partial<ContextTimedInput>] | []): PromiseCancellable<void>; | ||
withF<T>(...params: [ | ||
...requests: Array<MultiLockRequest<L>>, | ||
f: (lockBox: LockBox<L>) => Promise<T> | ||
...([...requests: Array<LockRequest<L>>, ctx: Partial<ContextTimedInput>] | [...requests: Array<LockRequest<L>>] | [ctx?: Partial<ContextTimedInput>]), | ||
(lockBox: LockBox<L>) => Promise<T> | ||
]): Promise<T>; | ||
withMultiF<T>(...params: [ | ||
...requests: Array<MultiLockRequest<L>>, | ||
f: (multiLocks: Array<MultiLockAcquired<L>>) => Promise<T> | ||
...requests: Array<LockRequest<L>>, | ||
f: (multiLocks: Array<LockAcquired<L>>) => Promise<T> | ||
]): Promise<T>; | ||
withG<T, TReturn, TNext>(...params: [ | ||
...requests: Array<MultiLockRequest<L>>, | ||
g: (lockBox: LockBox<L>) => AsyncGenerator<T, TReturn, TNext> | ||
...([...requests: Array<LockRequest<L>>, ctx: Partial<ContextTimedInput>] | [...requests: Array<LockRequest<L>>] | [ctx?: Partial<ContextTimedInput>]), | ||
(lockBox: LockBox<L>) => AsyncGenerator<T, TReturn, TNext> | ||
]): AsyncGenerator<T, TReturn, TNext>; | ||
withMultiG<T, TReturn, TNext>(...params: [ | ||
...requests: Array<MultiLockRequest<L>>, | ||
g: (multiLocks: Array<MultiLockAcquired<L>>) => AsyncGenerator<T, TReturn, TNext> | ||
...requests: Array<LockRequest<L>>, | ||
g: (multiLocks: Array<LockAcquired<L>>) => AsyncGenerator<T, TReturn, TNext> | ||
]): AsyncGenerator<T, TReturn, TNext>; | ||
} | ||
export default LockBox; |
"use strict"; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { | ||
Object.defineProperty(o, "default", { enumerable: true, value: v }); | ||
}) : function(o, v) { | ||
o["default"] = v; | ||
}); | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); | ||
__setModuleDefault(result, mod); | ||
return result; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const async_cancellable_1 = require("@matrixai/async-cancellable"); | ||
const resources_1 = require("@matrixai/resources"); | ||
const errors_1 = require("./errors"); | ||
const utils = __importStar(require("./utils")); | ||
const errors = __importStar(require("./errors")); | ||
class LockBox { | ||
constructor() { | ||
this._locks = new Map(); | ||
_locks = new Map(); | ||
get locks() { | ||
return this._locks; | ||
} | ||
lock(...requests) { | ||
return async () => { | ||
// Convert to strings | ||
// This creates a copy of the requests | ||
let requests_ = requests.map(([key, ...rest]) => typeof key === 'string' ? [key, ...rest] : [key.toString(), ...rest]); | ||
// Sort to ensure lock hierarchy | ||
requests_.sort(([key1], [key2]) => { | ||
// Deterministic string comparison according to 16-bit code units | ||
if (key1 < key2) | ||
return -1; | ||
if (key1 > key2) | ||
return 1; | ||
return 0; | ||
}); | ||
// Avoid duplicate locking | ||
requests_ = requests_.filter(([key], i, arr) => i === 0 || key !== arr[i - 1][0]); | ||
const locks = []; | ||
try { | ||
for (const [key, LockConstructor, ...lockingParams] of requests_) { | ||
let lock = this._locks.get(key); | ||
if (lock == null) { | ||
lock = new LockConstructor(); | ||
this._locks.set(key, lock); | ||
} | ||
else { | ||
// It is possible to swap the lock class, but only after the lock key is released | ||
if (!(lock instanceof LockConstructor)) { | ||
throw new errors_1.ErrorAsyncLocksLockBoxConflict(`Lock ${key} is already locked with class ${lock.constructor.name}, which conflicts with class ${LockConstructor.name}`); | ||
get count() { | ||
let count = 0; | ||
for (const lock of this._locks.values()) { | ||
count += lock.count; | ||
} | ||
return count; | ||
} | ||
isLocked(key, ...params) { | ||
if (key == null) { | ||
for (const lock of this._locks.values()) { | ||
if (lock.isLocked(...params)) | ||
return true; | ||
} | ||
return false; | ||
} | ||
else { | ||
const lock = this._locks.get(key); | ||
if (lock == null) | ||
return false; | ||
return lock.isLocked(...params); | ||
} | ||
} | ||
lock(...params) { | ||
let ctx = (!Array.isArray(params[params.length - 1]) ? params.pop() : undefined); | ||
ctx = ctx != null ? { ...ctx } : {}; | ||
const requests = params; | ||
return () => { | ||
return utils.setupTimedCancellable(async (ctx) => { | ||
// This creates a copy of the requests | ||
let requests_ = [...requests]; | ||
// Sort to ensure lock hierarchy | ||
requests_.sort(([key1], [key2]) => { | ||
// Deterministic string comparison according to 16-bit code units | ||
if (key1 < key2) | ||
return -1; | ||
if (key1 > key2) | ||
return 1; | ||
return 0; | ||
}); | ||
// Avoid duplicate locking | ||
requests_ = requests_.filter(([key], i, arr) => i === 0 || key !== arr[i - 1][0]); | ||
const locks = []; | ||
try { | ||
for (const [key, LockConstructor, ...lockingParams] of requests_) { | ||
let lock = this._locks.get(key); | ||
if (lock == null) { | ||
lock = new LockConstructor(); | ||
this._locks.set(key, lock); | ||
} | ||
else { | ||
// It is possible to swap the lock class, but only after the lock key is released | ||
if (!(lock instanceof LockConstructor)) { | ||
throw new errors.ErrorAsyncLocksLockBoxConflict(`Lock ${key} is already locked with class ${lock.constructor.name}, which conflicts with class ${LockConstructor.name}`); | ||
} | ||
} | ||
const lockAcquire = lock.lock(...lockingParams, ctx); | ||
const lockAcquireP = lockAcquire(); | ||
const [lockRelease] = await lockAcquireP; | ||
locks.push([key, lockRelease, lock]); | ||
} | ||
const lockAcquire = lock.lock(...lockingParams); | ||
const [lockRelease] = await lockAcquire(); | ||
locks.push([key, lockRelease, lock]); | ||
} | ||
} | ||
catch (e) { | ||
// Release all intermediate locks in reverse order | ||
locks.reverse(); | ||
for (const [key, lockRelease, lock] of locks) { | ||
await lockRelease(); | ||
// If it is still locked, then it is held by a different context | ||
// only delete if no contexts are locking the lock | ||
if (!lock.isLocked()) { | ||
this._locks.delete(key); | ||
} | ||
} | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
// Release all locks in reverse order | ||
catch (e) { | ||
// Release all intermediate locks in reverse order | ||
locks.reverse(); | ||
@@ -73,13 +107,29 @@ for (const [key, lockRelease, lock] of locks) { | ||
} | ||
}, | ||
this, | ||
]; | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
// Release all locks in reverse order | ||
locks.reverse(); | ||
for (const [key, lockRelease, lock] of locks) { | ||
await lockRelease(); | ||
// If it is still locked, then it is held by a different context | ||
// only delete if no contexts are locking the lock | ||
if (!lock.isLocked()) { | ||
this._locks.delete(key); | ||
} | ||
} | ||
}, | ||
this, | ||
]; | ||
}, true, Infinity, errors.ErrorAsyncLocksTimeout, ctx, []); | ||
}; | ||
} | ||
lockMulti(...requests) { | ||
// Convert to strings | ||
// This creates a copy of the requests | ||
let requests_ = requests.map(([key, ...rest]) => typeof key === 'string' | ||
? [key, key, ...rest] | ||
: [key.toString(), key, ...rest]); | ||
let requests_ = [...requests]; | ||
// Sort to ensure lock hierarchy | ||
@@ -97,35 +147,25 @@ requests_.sort(([key1], [key2]) => { | ||
const lockAcquires = []; | ||
for (const [key, keyOrig, LockConstructor, ...lockingParams] of requests_) { | ||
const lockAcquire = async () => { | ||
let lock = this._locks.get(key); | ||
let lockRelease; | ||
try { | ||
if (lock == null) { | ||
lock = new LockConstructor(); | ||
this._locks.set(key, lock); | ||
} | ||
else { | ||
// It is possible to swap the lock class, but only after the lock key is released | ||
if (!(lock instanceof LockConstructor)) { | ||
throw new errors_1.ErrorAsyncLocksLockBoxConflict(`Lock ${key} is already locked with class ${lock.constructor.name}, which conflicts with class ${LockConstructor.name}`); | ||
for (const [key, LockConstructor, ...lockingParams] of requests_) { | ||
const lockAcquire = () => { | ||
let currentP; | ||
const f = async () => { | ||
let lock = this._locks.get(key); | ||
let lockRelease; | ||
try { | ||
if (lock == null) { | ||
lock = new LockConstructor(); | ||
this._locks.set(key, lock); | ||
} | ||
else { | ||
// It is possible to swap the lock class, but only after the lock key is released | ||
if (!(lock instanceof LockConstructor)) { | ||
throw new errors.ErrorAsyncLocksLockBoxConflict(`Lock ${key} is already locked with class ${lock.constructor.name}, which conflicts with class ${LockConstructor.name}`); | ||
} | ||
} | ||
const lockAcquire = lock.lock(...lockingParams); | ||
const lockAcquireP = lockAcquire(); | ||
currentP = lockAcquireP; | ||
[lockRelease] = await lockAcquireP; | ||
} | ||
const lockAcquire = lock.lock(...lockingParams); | ||
[lockRelease] = await lockAcquire(); | ||
} | ||
catch (e) { | ||
// If it is still locked, then it is held by a different context | ||
// only delete if no contexts are locking the lock | ||
if (!lock.isLocked()) { | ||
this._locks.delete(key); | ||
} | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
await lockRelease(); | ||
catch (e) { | ||
// If it is still locked, then it is held by a different context | ||
@@ -136,55 +176,68 @@ // only delete if no contexts are locking the lock | ||
} | ||
}, | ||
lock, | ||
]; | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
await lockRelease(); | ||
// If it is still locked, then it is held by a different context | ||
// only delete if no contexts are locking the lock | ||
if (!lock.isLocked()) { | ||
this._locks.delete(key); | ||
} | ||
}, | ||
lock, | ||
]; | ||
}; | ||
return async_cancellable_1.PromiseCancellable.from(f(), (signal) => { | ||
signal.addEventListener('abort', () => { | ||
currentP.cancel(signal.reason); | ||
}, { once: true }); | ||
}); | ||
}; | ||
lockAcquires.push([keyOrig, lockAcquire, ...lockingParams]); | ||
lockAcquires.push([key, lockAcquire, ...lockingParams]); | ||
} | ||
return lockAcquires; | ||
} | ||
get locks() { | ||
return this._locks; | ||
} | ||
get count() { | ||
let count = 0; | ||
for (const lock of this._locks.values()) { | ||
count += lock.count; | ||
} | ||
return count; | ||
} | ||
isLocked(key, ...params) { | ||
waitForUnlock(...params) { | ||
const key = params.length === 2 | ||
? params[0] | ||
: typeof params[0] === 'string' | ||
? params[0] | ||
: undefined; | ||
const ctx = params.length === 2 | ||
? params[1] | ||
: typeof params[0] !== 'string' | ||
? params[0] | ||
: undefined; | ||
if (key == null) { | ||
const waitPs = []; | ||
for (const lock of this._locks.values()) { | ||
if (lock.isLocked(...params)) | ||
return true; | ||
waitPs.push(lock.waitForUnlock(ctx)); | ||
} | ||
return false; | ||
const waitP = Promise.all(waitPs).then(() => { }); | ||
return async_cancellable_1.PromiseCancellable.from(waitP, (signal) => { | ||
signal.addEventListener('abort', () => { | ||
waitPs.reverse(); | ||
for (const waitP of waitPs) { | ||
waitP.cancel(signal.reason); | ||
} | ||
}, { once: true }); | ||
}); | ||
} | ||
else { | ||
const lock = this._locks.get(key.toString()); | ||
const lock = this._locks.get(key); | ||
if (lock == null) | ||
return false; | ||
return lock.isLocked(...params); | ||
return async_cancellable_1.PromiseCancellable.resolve(); | ||
return lock.waitForUnlock(ctx); | ||
} | ||
} | ||
async waitForUnlock(timeout, key) { | ||
if (key == null) { | ||
const ps = []; | ||
for (const lock of this._locks.values()) { | ||
ps.push(lock.waitForUnlock(timeout)); | ||
} | ||
await Promise.all(ps); | ||
} | ||
else { | ||
const lock = this._locks.get(key.toString()); | ||
if (lock == null) | ||
return; | ||
await lock.waitForUnlock(timeout); | ||
} | ||
} | ||
async withF(...params) { | ||
withF(...params) { | ||
const f = params.pop(); | ||
return (0, resources_1.withF)([this.lock(...params)], ([lockBox]) => f(lockBox)); | ||
} | ||
async withMultiF(...params) { | ||
withMultiF(...params) { | ||
const f = params.pop(); | ||
@@ -191,0 +244,0 @@ const lockAcquires = this.lockMulti(...params); |
@@ -1,5 +0,5 @@ | ||
import type { MutexInterface } from 'async-mutex'; | ||
import type { ResourceAcquire } from '@matrixai/resources'; | ||
import type { Lockable } from './types'; | ||
import { Mutex } from 'async-mutex'; | ||
import type { ResourceRelease } from '@matrixai/resources'; | ||
import type { ResourceAcquireCancellable, Lockable, ContextTimedInput } from './types'; | ||
import { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
import Lock from './Lock'; | ||
/** | ||
@@ -9,11 +9,9 @@ * Read-preferring read write lock | ||
declare class RWLockReader implements Lockable { | ||
protected readersLock: Mutex; | ||
protected writersLock: Mutex; | ||
protected writersRelease: MutexInterface.Releaser; | ||
protected readersLock: Lock; | ||
protected writersLock: Lock; | ||
protected writersRelease: ResourceRelease; | ||
protected readerCountBlocked: number; | ||
protected _readerCount: number; | ||
protected _writerCount: number; | ||
lock(type?: 'read' | 'write', timeout?: number): ResourceAcquire<RWLockReader>; | ||
read(timeout?: number): ResourceAcquire<RWLockReader>; | ||
write(timeout?: number): ResourceAcquire<RWLockReader>; | ||
protected acquireWritersLockP: PromiseCancellable<readonly [ResourceRelease, Lock?]>; | ||
get count(): number; | ||
@@ -26,20 +24,29 @@ get readerCount(): number; | ||
*/ | ||
isLocked(): boolean; | ||
waitForUnlock(timeout?: number): Promise<void>; | ||
isLocked(type?: 'read' | 'write'): boolean; | ||
lock(...params: [type?: 'read' | 'write', ctx?: Partial<ContextTimedInput>] | [type?: 'read' | 'write'] | [ctx?: Partial<ContextTimedInput>] | []): ResourceAcquireCancellable<RWLockReader>; | ||
read(ctx?: Partial<ContextTimedInput>): ResourceAcquireCancellable<RWLockReader>; | ||
write(ctx?: Partial<ContextTimedInput>): ResourceAcquireCancellable<RWLockReader>; | ||
waitForUnlock(ctx?: Partial<ContextTimedInput>): PromiseCancellable<void>; | ||
withF<T>(...params: [ | ||
...([type: 'read' | 'write', timeout: number] | [type: 'read' | 'write'] | []), | ||
...([type?: 'read' | 'write', ctx?: Partial<ContextTimedInput>] | [type?: 'read' | 'write'] | [ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockReader) => Promise<T> | ||
]): Promise<T>; | ||
withReadF<T>(...params: [...([timeout: number] | []), (lock: RWLockReader) => Promise<T>]): Promise<T>; | ||
withWriteF<T>(...params: [...([timeout: number] | []), (lock: RWLockReader) => Promise<T>]): Promise<T>; | ||
withReadF<T>(...params: [ | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockReader) => Promise<T> | ||
]): Promise<T>; | ||
withWriteF<T>(...params: [ | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockReader) => Promise<T> | ||
]): Promise<T>; | ||
withG<T, TReturn, TNext>(...params: [ | ||
...([type: 'read' | 'write', timeout: number] | [type: 'read' | 'write'] | []), | ||
...([type?: 'read' | 'write', ctx?: Partial<ContextTimedInput>] | [type?: 'read' | 'write'] | [ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockReader) => AsyncGenerator<T, TReturn, TNext> | ||
]): AsyncGenerator<T, TReturn, TNext>; | ||
withReadG<T, TReturn, TNext>(...params: [ | ||
...([timeout: number] | []), | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockReader) => AsyncGenerator<T, TReturn, TNext> | ||
]): AsyncGenerator<T, TReturn, TNext>; | ||
withWriteG<T, TReturn, TNext>(...params: [ | ||
...([timeout: number] | []), | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockReader) => AsyncGenerator<T, TReturn, TNext> | ||
@@ -46,0 +53,0 @@ ]): AsyncGenerator<T, TReturn, TNext>; |
"use strict"; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { | ||
Object.defineProperty(o, "default", { enumerable: true, value: v }); | ||
}) : function(o, v) { | ||
o["default"] = v; | ||
}); | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); | ||
__setModuleDefault(result, mod); | ||
return result; | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const async_mutex_1 = require("async-mutex"); | ||
const async_cancellable_1 = require("@matrixai/async-cancellable"); | ||
const resources_1 = require("@matrixai/resources"); | ||
const utils_1 = require("./utils"); | ||
const errors_1 = require("./errors"); | ||
const Lock_1 = __importDefault(require("./Lock")); | ||
const utils = __importStar(require("./utils")); | ||
const errors = __importStar(require("./errors")); | ||
/** | ||
@@ -11,148 +38,158 @@ * Read-preferring read write lock | ||
class RWLockReader { | ||
constructor() { | ||
this.readersLock = new async_mutex_1.Mutex(); | ||
this.writersLock = new async_mutex_1.Mutex(); | ||
this.readerCountBlocked = 0; | ||
this._readerCount = 0; | ||
this._writerCount = 0; | ||
readersLock = new Lock_1.default(); | ||
writersLock = new Lock_1.default(); | ||
writersRelease; | ||
readerCountBlocked = 0; | ||
_readerCount = 0; | ||
_writerCount = 0; | ||
acquireWritersLockP; | ||
get count() { | ||
return this.readerCount + this.writerCount; | ||
} | ||
lock(type = 'write', timeout) { | ||
get readerCount() { | ||
return this._readerCount + this.readerCountBlocked; | ||
} | ||
get writerCount() { | ||
return this._writerCount; | ||
} | ||
/** | ||
* Check if locked | ||
* If passed `type`, it will also check that the active lock is of that type | ||
*/ | ||
isLocked(type) { | ||
if (type === 'read') { | ||
return this._readerCount > 0 || this.readersLock.isLocked(); | ||
} | ||
else if (type === 'write') { | ||
return this._readerCount === 0 && this.writersLock.isLocked(); | ||
} | ||
else { | ||
return (this._readerCount > 0 || | ||
this.readersLock.isLocked() || | ||
this.writersLock.isLocked()); | ||
} | ||
} | ||
lock(...params) { | ||
const type = (params.length === 2 | ||
? params[0] | ||
: typeof params[0] === 'string' | ||
? params[0] | ||
: undefined) ?? 'write'; | ||
const ctx = params.length === 2 | ||
? params[1] | ||
: typeof params[0] !== 'string' | ||
? params[0] | ||
: undefined; | ||
switch (type) { | ||
case 'read': | ||
return this.read(timeout); | ||
return this.read(ctx); | ||
case 'write': | ||
return this.write(timeout); | ||
return this.write(ctx); | ||
} | ||
} | ||
read(timeout) { | ||
return async () => { | ||
const t1 = performance.now(); | ||
++this.readerCountBlocked; | ||
let readersLock = this.readersLock; | ||
if (timeout != null) { | ||
readersLock = (0, async_mutex_1.withTimeout)(this.readersLock, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
let readersRelease; | ||
try { | ||
readersRelease = await readersLock.acquire(); | ||
} | ||
catch (e) { | ||
--this.readerCountBlocked; | ||
throw e; | ||
} | ||
--this.readerCountBlocked; | ||
const readerCount = ++this._readerCount; | ||
// The first reader locks | ||
if (readerCount === 1) { | ||
let writersLock = this.writersLock; | ||
if (timeout != null) { | ||
timeout = timeout - (performance.now() - t1); | ||
writersLock = (0, async_mutex_1.withTimeout)(this.writersLock, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
read(ctx) { | ||
ctx = ctx != null ? { ...ctx } : {}; | ||
return () => { | ||
return utils.setupTimedCancellable(async (ctx) => { | ||
++this.readerCountBlocked; | ||
const acquireReadersLock = this.readersLock.lock(ctx); | ||
const acquireReadersLockP = acquireReadersLock(); | ||
let readersRelease; | ||
try { | ||
this.writersRelease = await writersLock.acquire(); | ||
[readersRelease] = await acquireReadersLockP; | ||
--this.readerCountBlocked; | ||
} | ||
catch (e) { | ||
readersRelease(); | ||
--this._readerCount; | ||
--this.readerCountBlocked; | ||
throw e; | ||
} | ||
readersRelease(); | ||
} | ||
else { | ||
readersRelease(); | ||
// Yield for the first reader to finish locking | ||
await (0, utils_1.yieldMicro)(); | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
readersRelease = await this.readersLock.acquire(); | ||
const readerCount = --this._readerCount; | ||
// The last reader unlocks | ||
if (readerCount === 0) { | ||
this.writersRelease(); | ||
const readerCount = ++this._readerCount; | ||
// The first reader locks | ||
if (readerCount === 1) { | ||
const acquireWritersLock = this.writersLock.lock(ctx); | ||
this.acquireWritersLockP = acquireWritersLock(); | ||
try { | ||
[this.writersRelease] = await this.acquireWritersLockP; | ||
await readersRelease(); | ||
} | ||
readersRelease(); | ||
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54 | ||
await (0, utils_1.yieldMicro)(); | ||
}, | ||
this, | ||
]; | ||
catch (e) { | ||
await readersRelease(); | ||
--this._readerCount; | ||
throw e; | ||
} | ||
} | ||
else { | ||
await readersRelease(); | ||
await this.acquireWritersLockP.catch(() => { }); | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
[readersRelease] = await this.readersLock.lock()(); | ||
const readerCount = --this._readerCount; | ||
// The last reader unlocks | ||
if (readerCount === 0) { | ||
await this.writersRelease(); | ||
} | ||
await readersRelease(); | ||
}, | ||
this, | ||
]; | ||
}, true, Infinity, errors.ErrorAsyncLocksTimeout, ctx, []); | ||
}; | ||
} | ||
write(timeout) { | ||
return async () => { | ||
write(ctx) { | ||
return () => { | ||
++this._writerCount; | ||
let writersLock = this.writersLock; | ||
if (timeout != null) { | ||
writersLock = (0, async_mutex_1.withTimeout)(this.writersLock, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
let release; | ||
try { | ||
release = await writersLock.acquire(); | ||
} | ||
catch (e) { | ||
const acquireWritersLock = this.writersLock.lock(ctx); | ||
const acquireWritersLockP = acquireWritersLock(); | ||
return acquireWritersLockP.then(([release]) => { | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
await release(); | ||
--this._writerCount; | ||
}, | ||
this, | ||
]; | ||
}, (e) => { | ||
--this._writerCount; | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
release(); | ||
--this._writerCount; | ||
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54 | ||
await (0, utils_1.yieldMicro)(); | ||
}, | ||
this, | ||
]; | ||
}, (signal) => { | ||
signal.addEventListener('abort', () => { | ||
acquireWritersLockP.cancel(signal.reason); | ||
}, { once: true }); | ||
}); | ||
}; | ||
} | ||
get count() { | ||
return this.readerCount + this.writerCount; | ||
waitForUnlock(ctx) { | ||
const waitReadersLockP = this.readersLock.waitForUnlock(ctx); | ||
const waitWritersLockP = this.writersLock.waitForUnlock(ctx); | ||
return async_cancellable_1.PromiseCancellable.all([waitReadersLockP, waitWritersLockP]).then(() => { }, undefined, (signal) => { | ||
signal.addEventListener('abort', () => { | ||
waitReadersLockP.cancel(signal.reason); | ||
waitWritersLockP.cancel(signal.reason); | ||
}, { once: true }); | ||
}); | ||
} | ||
get readerCount() { | ||
return this._readerCount + this.readerCountBlocked; | ||
} | ||
get writerCount() { | ||
return this._writerCount; | ||
} | ||
/** | ||
* Check if locked | ||
* If passed `type`, it will also check that the active lock is of that type | ||
*/ | ||
isLocked() { | ||
return this.readersLock.isLocked() || this.writersLock.isLocked(); | ||
} | ||
async waitForUnlock(timeout) { | ||
if (timeout != null) { | ||
let timedOut = false; | ||
await Promise.race([ | ||
Promise.all([ | ||
this.readersLock.waitForUnlock(), | ||
this.writersLock.waitForUnlock(), | ||
]), | ||
(0, utils_1.sleep)(timeout).then(() => { | ||
timedOut = true; | ||
}), | ||
]); | ||
if (timedOut) { | ||
throw new errors_1.ErrorAsyncLocksTimeout(); | ||
} | ||
withF(...params) { | ||
let type; | ||
if (params.length === 2) { | ||
type = params.shift(); | ||
} | ||
else { | ||
await Promise.all([ | ||
this.readersLock.waitForUnlock(), | ||
this.writersLock.waitForUnlock(), | ||
]); | ||
if (typeof params[0] === 'string') { | ||
type = params.shift(); | ||
} | ||
else if (typeof params[0] == null) { | ||
params.shift(); | ||
} | ||
} | ||
} | ||
async withF(...params) { | ||
const type = params.shift(); | ||
type = type ?? 'write'; | ||
switch (type) { | ||
@@ -165,14 +202,24 @@ case 'read': | ||
} | ||
async withReadF(...params) { | ||
withReadF(...params) { | ||
const f = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withF)([this.read(timeout)], ([lock]) => f(lock)); | ||
return (0, resources_1.withF)([this.read(...params)], ([lock]) => f(lock)); | ||
} | ||
async withWriteF(...params) { | ||
withWriteF(...params) { | ||
const f = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withF)([this.write(timeout)], ([lock]) => f(lock)); | ||
return (0, resources_1.withF)([this.write(...params)], ([lock]) => f(lock)); | ||
} | ||
withG(...params) { | ||
const type = params.shift(); | ||
let type; | ||
if (params.length === 2) { | ||
type = params.shift(); | ||
} | ||
else { | ||
if (typeof params[0] === 'string') { | ||
type = params.shift(); | ||
} | ||
else if (typeof params[0] == null) { | ||
params.shift(); | ||
} | ||
} | ||
type = type ?? 'write'; | ||
switch (type) { | ||
@@ -187,9 +234,7 @@ case 'read': | ||
const g = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withG)([this.read(timeout)], ([lock]) => g(lock)); | ||
return (0, resources_1.withG)([this.read(...params)], ([lock]) => g(lock)); | ||
} | ||
withWriteG(...params) { | ||
const g = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withG)([this.write(timeout)], ([lock]) => g(lock)); | ||
return (0, resources_1.withG)([this.write(...params)], ([lock]) => g(lock)); | ||
} | ||
@@ -196,0 +241,0 @@ } |
@@ -1,5 +0,5 @@ | ||
import type { MutexInterface } from 'async-mutex'; | ||
import type { ResourceAcquire } from '@matrixai/resources'; | ||
import type { Lockable } from './types'; | ||
import { Mutex } from 'async-mutex'; | ||
import type { ResourceRelease } from '@matrixai/resources'; | ||
import type { ResourceAcquireCancellable, Lockable, ContextTimedInput } from './types'; | ||
import { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
import Lock from './Lock'; | ||
/** | ||
@@ -9,11 +9,9 @@ * Write-preferring read write lock | ||
declare class RWLockWriter implements Lockable { | ||
protected readersLock: Mutex; | ||
protected writersLock: Mutex; | ||
protected readersRelease: MutexInterface.Releaser; | ||
protected readersLock: Lock; | ||
protected writersLock: Lock; | ||
protected readersRelease: ResourceRelease; | ||
protected readerCountBlocked: number; | ||
protected acquireReadersLockP: PromiseCancellable<readonly [ResourceRelease, Lock?]>; | ||
protected _readerCount: number; | ||
protected _writerCount: number; | ||
lock(type?: 'read' | 'write', timeout?: number): ResourceAcquire<RWLockWriter>; | ||
read(timeout?: number): ResourceAcquire<RWLockWriter>; | ||
write(timeout?: number): ResourceAcquire<RWLockWriter>; | ||
get count(): number; | ||
@@ -27,19 +25,28 @@ get readerCount(): number; | ||
isLocked(type?: 'read' | 'write'): boolean; | ||
waitForUnlock(timeout?: number): Promise<void>; | ||
lock(...params: [type?: 'read' | 'write', ctx?: Partial<ContextTimedInput>] | [type?: 'read' | 'write'] | [ctx?: Partial<ContextTimedInput>] | []): ResourceAcquireCancellable<RWLockWriter>; | ||
read(ctx?: Partial<ContextTimedInput>): ResourceAcquireCancellable<RWLockWriter>; | ||
write(ctx?: Partial<ContextTimedInput>): ResourceAcquireCancellable<RWLockWriter>; | ||
waitForUnlock(ctx?: Partial<ContextTimedInput>): PromiseCancellable<void>; | ||
withF<T>(...params: [ | ||
...([type: 'read' | 'write', timeout: number] | [type: 'read' | 'write'] | []), | ||
...([type?: 'read' | 'write', ctx?: Partial<ContextTimedInput>] | [type?: 'read' | 'write'] | [ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockWriter) => Promise<T> | ||
]): Promise<T>; | ||
withReadF<T>(...params: [...([timeout: number] | []), (lock: RWLockWriter) => Promise<T>]): Promise<T>; | ||
withWriteF<T>(...params: [...([timeout: number] | []), (lock: RWLockWriter) => Promise<T>]): Promise<T>; | ||
withReadF<T>(...params: [ | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockWriter) => Promise<T> | ||
]): Promise<T>; | ||
withWriteF<T>(...params: [ | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockWriter) => Promise<T> | ||
]): Promise<T>; | ||
withG<T, TReturn, TNext>(...params: [ | ||
...([type: 'read' | 'write', timeout: number] | [type: 'read' | 'write'] | []), | ||
...([type?: 'read' | 'write', ctx?: Partial<ContextTimedInput>] | [type?: 'read' | 'write'] | [ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockWriter) => AsyncGenerator<T, TReturn, TNext> | ||
]): AsyncGenerator<T, TReturn, TNext>; | ||
withReadG<T, TReturn, TNext>(...params: [ | ||
...([timeout: number] | []), | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockWriter) => AsyncGenerator<T, TReturn, TNext> | ||
]): AsyncGenerator<T, TReturn, TNext>; | ||
withWriteG<T, TReturn, TNext>(...params: [ | ||
...([timeout: number] | []), | ||
...([ctx?: Partial<ContextTimedInput>] | []), | ||
(lock: RWLockWriter) => AsyncGenerator<T, TReturn, TNext> | ||
@@ -46,0 +53,0 @@ ]): AsyncGenerator<T, TReturn, TNext>; |
"use strict"; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { | ||
Object.defineProperty(o, "default", { enumerable: true, value: v }); | ||
}) : function(o, v) { | ||
o["default"] = v; | ||
}); | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); | ||
__setModuleDefault(result, mod); | ||
return result; | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const perf_hooks_1 = require("perf_hooks"); | ||
const async_mutex_1 = require("async-mutex"); | ||
const resources_1 = require("@matrixai/resources"); | ||
const utils_1 = require("./utils"); | ||
const errors_1 = require("./errors"); | ||
const async_cancellable_1 = require("@matrixai/async-cancellable"); | ||
const Lock_1 = __importDefault(require("./Lock")); | ||
const utils = __importStar(require("./utils")); | ||
const errors = __importStar(require("./errors")); | ||
/** | ||
@@ -12,125 +38,9 @@ * Write-preferring read write lock | ||
class RWLockWriter { | ||
constructor() { | ||
this.readersLock = new async_mutex_1.Mutex(); | ||
this.writersLock = new async_mutex_1.Mutex(); | ||
this.readerCountBlocked = 0; | ||
this._readerCount = 0; | ||
this._writerCount = 0; | ||
} | ||
lock(type = 'write', timeout) { | ||
switch (type) { | ||
case 'read': | ||
return this.read(timeout); | ||
case 'write': | ||
return this.write(timeout); | ||
} | ||
} | ||
read(timeout) { | ||
return async () => { | ||
const t1 = perf_hooks_1.performance.now(); | ||
if (this._writerCount > 0) { | ||
++this.readerCountBlocked; | ||
if (timeout != null) { | ||
let timedOut = false; | ||
await Promise.race([ | ||
this.writersLock.waitForUnlock(), | ||
(0, utils_1.sleep)(timeout).then(() => { | ||
timedOut = true; | ||
}), | ||
]); | ||
if (timedOut) { | ||
--this.readerCountBlocked; | ||
throw new errors_1.ErrorAsyncLocksTimeout(); | ||
} | ||
} | ||
else { | ||
await this.writersLock.waitForUnlock(); | ||
} | ||
--this.readerCountBlocked; | ||
} | ||
const readerCount = ++this._readerCount; | ||
// The first reader locks | ||
if (readerCount === 1) { | ||
let readersLock = this.readersLock; | ||
if (timeout != null) { | ||
timeout = timeout - (perf_hooks_1.performance.now() - t1); | ||
readersLock = (0, async_mutex_1.withTimeout)(this.readersLock, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
try { | ||
this.readersRelease = await readersLock.acquire(); | ||
} | ||
catch (e) { | ||
--this._readerCount; | ||
throw e; | ||
} | ||
} | ||
else { | ||
// Yield for the first reader to finish locking | ||
await (0, utils_1.yieldMicro)(); | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
const readerCount = --this._readerCount; | ||
// The last reader unlocks | ||
if (readerCount === 0) { | ||
this.readersRelease(); | ||
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54 | ||
await (0, utils_1.yieldMicro)(); | ||
} | ||
}, | ||
this, | ||
]; | ||
}; | ||
} | ||
write(timeout) { | ||
return async () => { | ||
++this._writerCount; | ||
let writersLock = this.writersLock; | ||
if (timeout != null) { | ||
writersLock = (0, async_mutex_1.withTimeout)(this.writersLock, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
const t1 = perf_hooks_1.performance.now(); | ||
let writersRelease; | ||
try { | ||
writersRelease = await writersLock.acquire(); | ||
} | ||
catch (e) { | ||
--this._writerCount; | ||
throw e; | ||
} | ||
let readersLock = this.readersLock; | ||
if (timeout != null) { | ||
timeout = timeout - (perf_hooks_1.performance.now() - t1); | ||
readersLock = (0, async_mutex_1.withTimeout)(this.readersLock, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
try { | ||
this.readersRelease = await readersLock.acquire(); | ||
} | ||
catch (e) { | ||
writersRelease(); | ||
--this._writerCount; | ||
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54 | ||
await (0, utils_1.yieldMicro)(); | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
this.readersRelease(); | ||
writersRelease(); | ||
--this._writerCount; | ||
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54 | ||
await (0, utils_1.yieldMicro)(); | ||
}, | ||
this, | ||
]; | ||
}; | ||
} | ||
readersLock = new Lock_1.default(); | ||
writersLock = new Lock_1.default(); | ||
readersRelease; | ||
readerCountBlocked = 0; | ||
acquireReadersLockP; | ||
_readerCount = 0; | ||
_writerCount = 0; | ||
get count() { | ||
@@ -151,3 +61,3 @@ return this.readerCount + this.writerCount; | ||
if (type === 'read') { | ||
return this.readersLock.isLocked(); | ||
return this._writerCount === 0 && this.readersLock.isLocked(); | ||
} | ||
@@ -161,27 +71,134 @@ else if (type === 'write') { | ||
} | ||
async waitForUnlock(timeout) { | ||
if (timeout != null) { | ||
let timedOut = false; | ||
await Promise.race([ | ||
Promise.all([ | ||
this.readersLock.waitForUnlock(), | ||
this.writersLock.waitForUnlock(), | ||
]), | ||
(0, utils_1.sleep)(timeout).then(() => { | ||
timedOut = true; | ||
}), | ||
]); | ||
if (timedOut) { | ||
throw new errors_1.ErrorAsyncLocksTimeout(); | ||
} | ||
lock(...params) { | ||
const type = (params.length === 2 | ||
? params[0] | ||
: typeof params[0] === 'string' | ||
? params[0] | ||
: undefined) ?? 'write'; | ||
const ctx = params.length === 2 | ||
? params[1] | ||
: typeof params[0] !== 'string' | ||
? params[0] | ||
: undefined; | ||
switch (type) { | ||
case 'read': | ||
return this.read(ctx); | ||
case 'write': | ||
return this.write(ctx); | ||
} | ||
} | ||
read(ctx) { | ||
ctx = ctx != null ? { ...ctx } : {}; | ||
return () => { | ||
return utils.setupTimedCancellable(async (ctx) => { | ||
if (this._writerCount > 0) { | ||
++this.readerCountBlocked; | ||
const waitWritersLockP = this.writersLock.waitForUnlock(ctx); | ||
try { | ||
await waitWritersLockP; | ||
} | ||
finally { | ||
--this.readerCountBlocked; | ||
} | ||
} | ||
const readerCount = ++this._readerCount; | ||
// The first reader locks | ||
if (readerCount === 1) { | ||
const acquireReadersLock = this.readersLock.lock(ctx); | ||
this.acquireReadersLockP = acquireReadersLock(); | ||
try { | ||
[this.readersRelease] = await this.acquireReadersLockP; | ||
} | ||
catch (e) { | ||
--this._readerCount; | ||
throw e; | ||
} | ||
} | ||
else { | ||
// Without this, the second or later reader will always lock faster | ||
// than the first reader. This forces the subsequent readers to always | ||
// wait for the first reader to settle, while discarding any errors. | ||
await this.acquireReadersLockP.catch(() => { }); | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
const readerCount = --this._readerCount; | ||
// The last reader unlocks | ||
if (readerCount === 0) { | ||
await this.readersRelease(); | ||
} | ||
}, | ||
this, | ||
]; | ||
}, true, Infinity, errors.ErrorAsyncLocksTimeout, ctx, []); | ||
}; | ||
} | ||
write(ctx) { | ||
ctx = ctx != null ? { ...ctx } : {}; | ||
return () => { | ||
return utils.setupTimedCancellable(async (ctx) => { | ||
++this._writerCount; | ||
const acquireWritersLock = this.writersLock.lock(ctx); | ||
const acquireWritersLockP = acquireWritersLock(); | ||
let writersRelease; | ||
try { | ||
[writersRelease] = await acquireWritersLockP; | ||
} | ||
catch (e) { | ||
--this._writerCount; | ||
throw e; | ||
} | ||
const acquireReadersLock = this.readersLock.lock(ctx); | ||
const acquireReadersLockP = acquireReadersLock(); | ||
try { | ||
[this.readersRelease] = await acquireReadersLockP; | ||
} | ||
catch (e) { | ||
await writersRelease(); | ||
--this._writerCount; | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
await this.readersRelease(); | ||
await writersRelease(); | ||
--this._writerCount; | ||
}, | ||
this, | ||
]; | ||
}, true, Infinity, errors.ErrorAsyncLocksTimeout, ctx, []); | ||
}; | ||
} | ||
waitForUnlock(ctx) { | ||
const waitReadersLockP = this.readersLock.waitForUnlock(ctx); | ||
const waitWritersLockP = this.writersLock.waitForUnlock(ctx); | ||
return async_cancellable_1.PromiseCancellable.all([waitReadersLockP, waitWritersLockP]).then(() => { }, undefined, (signal) => { | ||
signal.addEventListener('abort', () => { | ||
waitReadersLockP.cancel(signal.reason); | ||
waitWritersLockP.cancel(signal.reason); | ||
}, { once: true }); | ||
}); | ||
} | ||
withF(...params) { | ||
let type; | ||
if (params.length === 2) { | ||
type = params.shift(); | ||
} | ||
else { | ||
await Promise.all([ | ||
this.readersLock.waitForUnlock(), | ||
this.writersLock.waitForUnlock(), | ||
]); | ||
if (typeof params[0] === 'string') { | ||
type = params.shift(); | ||
} | ||
else if (typeof params[0] == null) { | ||
params.shift(); | ||
} | ||
} | ||
} | ||
async withF(...params) { | ||
const type = params.shift(); | ||
type = type ?? 'write'; | ||
switch (type) { | ||
@@ -194,14 +211,24 @@ case 'read': | ||
} | ||
async withReadF(...params) { | ||
withReadF(...params) { | ||
const f = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withF)([this.read(timeout)], ([lock]) => f(lock)); | ||
return (0, resources_1.withF)([this.read(...params)], ([lock]) => f(lock)); | ||
} | ||
async withWriteF(...params) { | ||
withWriteF(...params) { | ||
const f = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withF)([this.write(timeout)], ([lock]) => f(lock)); | ||
return (0, resources_1.withF)([this.write(...params)], ([lock]) => f(lock)); | ||
} | ||
withG(...params) { | ||
const type = params.shift(); | ||
let type; | ||
if (params.length === 2) { | ||
type = params.shift(); | ||
} | ||
else { | ||
if (typeof params[0] === 'string') { | ||
type = params.shift(); | ||
} | ||
else if (typeof params[0] == null) { | ||
params.shift(); | ||
} | ||
} | ||
type = type ?? 'write'; | ||
switch (type) { | ||
@@ -216,9 +243,7 @@ case 'read': | ||
const g = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withG)([this.read(timeout)], ([lock]) => g(lock)); | ||
return (0, resources_1.withG)([this.read(...params)], ([lock]) => g(lock)); | ||
} | ||
withWriteG(...params) { | ||
const g = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withG)([this.write(timeout)], ([lock]) => g(lock)); | ||
return (0, resources_1.withG)([this.write(...params)], ([lock]) => g(lock)); | ||
} | ||
@@ -225,0 +250,0 @@ } |
@@ -1,21 +0,35 @@ | ||
import type { ResourceAcquire } from '@matrixai/resources'; | ||
import type { Lockable } from './types'; | ||
import { Semaphore as _Semaphore } from 'async-mutex'; | ||
import type { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
import type { ResourceAcquireCancellable, Lockable, ContextTimedInput } from './types'; | ||
type Task = { | ||
task: () => void; | ||
weight: number; | ||
abortHandler: () => void; | ||
aborted?: boolean; | ||
}; | ||
declare class Semaphore implements Lockable { | ||
protected _semaphore: _Semaphore; | ||
readonly limit: number; | ||
readonly priority: boolean; | ||
protected _count: number; | ||
constructor(limit: number); | ||
lock(timeout?: number): ResourceAcquire<Semaphore>; | ||
protected currentWeight: number; | ||
protected queue: Array<Task>; | ||
protected abortQueueMap: WeakMap<() => void, Task>; | ||
constructor(limit: number, priority?: boolean); | ||
get count(): number; | ||
/** | ||
* This will be true synchronously upon calling `this.lock()()`. | ||
*/ | ||
isLocked(): boolean; | ||
waitForUnlock(timeout?: number): Promise<void>; | ||
lock(...params: [weight?: number, ctx?: Partial<ContextTimedInput>] | [weight?: number] | [ctx?: Partial<ContextTimedInput>] | []): ResourceAcquireCancellable<Semaphore>; | ||
waitForUnlock(...params: [weight?: number, ctx?: Partial<ContextTimedInput>] | [weight?: number] | [ctx?: Partial<ContextTimedInput>] | []): PromiseCancellable<void>; | ||
withF<T>(...params: [ | ||
...([timeout: number] | []), | ||
...([weight?: number, ctx?: Partial<ContextTimedInput>] | [weight?: number] | [ctx?: Partial<ContextTimedInput>] | []), | ||
(semaphore: Semaphore) => Promise<T> | ||
]): Promise<T>; | ||
withG<T, TReturn, TNext>(...params: [ | ||
...([timeout: number] | []), | ||
...([weight?: number, ctx?: Partial<ContextTimedInput>] | [weight?: number] | [ctx?: Partial<ContextTimedInput>] | []), | ||
(semaphore: Semaphore) => AsyncGenerator<T, TReturn, TNext> | ||
]): AsyncGenerator<T, TReturn, TNext>; | ||
protected insertQueue(task: Task): void; | ||
protected processQueue(): void; | ||
} | ||
export default Semaphore; |
"use strict"; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) { | ||
Object.defineProperty(o, "default", { enumerable: true, value: v }); | ||
}) : function(o, v) { | ||
o["default"] = v; | ||
}); | ||
var __importStar = (this && this.__importStar) || function (mod) { | ||
if (mod && mod.__esModule) return mod; | ||
var result = {}; | ||
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k); | ||
__setModuleDefault(result, mod); | ||
return result; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const async_mutex_1 = require("async-mutex"); | ||
const resources_1 = require("@matrixai/resources"); | ||
const utils_1 = require("./utils"); | ||
const errors_1 = require("./errors"); | ||
const utils = __importStar(require("./utils")); | ||
const errors = __importStar(require("./errors")); | ||
class Semaphore { | ||
constructor(limit) { | ||
this._count = 0; | ||
limit; | ||
priority; | ||
_count = 0; | ||
currentWeight = 0; | ||
queue = []; | ||
abortQueueMap = new WeakMap(); | ||
constructor(limit, priority = false) { | ||
if (limit < 1) { | ||
throw new errors_1.ErrorAsyncLocksSemaphoreLimit(); | ||
throw new RangeError('Semaphore must be constructed with `limit` >= 1'); | ||
} | ||
this._semaphore = new async_mutex_1.Semaphore(limit); | ||
this.limit = limit; | ||
this.priority = priority; | ||
} | ||
lock(timeout) { | ||
return async () => { | ||
++this._count; | ||
let semaphore = this._semaphore; | ||
if (timeout != null) { | ||
semaphore = (0, async_mutex_1.withTimeout)(this._semaphore, timeout, new errors_1.ErrorAsyncLocksTimeout()); | ||
} | ||
let release; | ||
try { | ||
[, release] = await semaphore.acquire(); | ||
} | ||
catch (e) { | ||
--this._count; | ||
throw e; | ||
} | ||
let released = false; | ||
return [ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
--this._count; | ||
release(); | ||
// Allow semaphore to settle https://github.com/DirtyHairy/async-mutex/issues/54 | ||
await (0, utils_1.yieldMicro)(); | ||
}, | ||
this, | ||
]; | ||
}; | ||
} | ||
get count() { | ||
return this._count; | ||
} | ||
/** | ||
* This will be true synchronously upon calling `this.lock()()`. | ||
*/ | ||
isLocked() { | ||
return this._semaphore.isLocked(); | ||
return this._count > 0; | ||
} | ||
async waitForUnlock(timeout) { | ||
if (timeout != null) { | ||
let timedOut = false; | ||
await Promise.race([ | ||
this._semaphore.waitForUnlock(), | ||
(0, utils_1.sleep)(timeout).then(() => { | ||
timedOut = true; | ||
}), | ||
]); | ||
if (timedOut) { | ||
throw new errors_1.ErrorAsyncLocksTimeout(); | ||
} | ||
lock(...params) { | ||
const weight = (params.length === 2 | ||
? params[0] | ||
: typeof params[0] === 'number' | ||
? params[0] | ||
: undefined) ?? 1; | ||
let ctx = params.length === 2 | ||
? params[1] | ||
: typeof params[0] !== 'number' | ||
? params[0] | ||
: undefined; | ||
if (weight < 1) { | ||
throw new RangeError('Semaphore must be locked with `weight` >= 1'); | ||
} | ||
else { | ||
await this._semaphore.waitForUnlock(); | ||
ctx = ctx != null ? { ...ctx } : {}; | ||
return () => { | ||
return utils.setupTimedCancellable((ctx, weight) => { | ||
this._count++; | ||
// Change `any` time to the resource thing | ||
const { p: lockP, resolveP: resolveLockP, rejectP: rejectLockP, } = utils.promise(); | ||
// If signal is already aborted, then we can reject with reason | ||
if (ctx.signal.aborted) { | ||
this._count--; | ||
rejectLockP(ctx.signal.reason); | ||
return lockP; | ||
} | ||
const abortHandler = () => { | ||
this._count--; | ||
const taskToAbort = this.abortQueueMap.get(abortHandler); | ||
if (taskToAbort != null) { | ||
taskToAbort.aborted = true; | ||
} | ||
rejectLockP(ctx.signal.reason); | ||
}; | ||
let released = false; | ||
const task = { | ||
task: () => { | ||
this.currentWeight += weight; | ||
ctx.signal.removeEventListener('abort', abortHandler); | ||
resolveLockP([ | ||
async () => { | ||
if (released) | ||
return; | ||
released = true; | ||
this._count--; | ||
this.currentWeight -= weight; | ||
this.processQueue(); | ||
}, | ||
this, | ||
]); | ||
}, | ||
weight, | ||
abortHandler, | ||
aborted: false, | ||
}; | ||
ctx.signal.addEventListener('abort', abortHandler, { once: true }); | ||
this.abortQueueMap.set(abortHandler, task); | ||
this.insertQueue(task); | ||
this.processQueue(); | ||
return lockP; | ||
}, true, Infinity, errors.ErrorAsyncLocksTimeout, ctx, [weight]); | ||
}; | ||
} | ||
waitForUnlock(...params) { | ||
const weight = (params.length === 2 | ||
? params[0] | ||
: typeof params[0] === 'number' | ||
? params[0] | ||
: undefined) ?? 1; | ||
let ctx = params.length === 2 | ||
? params[1] | ||
: typeof params[0] !== 'number' | ||
? params[0] | ||
: undefined; | ||
if (weight < 1) { | ||
throw new RangeError('Semaphore must be locked with `weight` >= 1'); | ||
} | ||
ctx = ctx != null ? { ...ctx } : {}; | ||
return utils.setupTimedCancellable((ctx, weight) => { | ||
const { p: waitP, resolveP: resolveWaitP, rejectP: rejectWaitP, } = utils.promise(); | ||
if (ctx.signal.aborted) { | ||
rejectWaitP(ctx.signal.reason); | ||
return waitP; | ||
} | ||
const abortHandler = () => { | ||
const taskToAbort = this.abortQueueMap.get(abortHandler); | ||
if (taskToAbort != null) { | ||
taskToAbort.aborted = true; | ||
} | ||
rejectWaitP(ctx.signal.reason); | ||
}; | ||
const task = { | ||
task: () => { | ||
ctx.signal.removeEventListener('abort', abortHandler); | ||
resolveWaitP(); | ||
}, | ||
weight, | ||
abortHandler, | ||
aborted: false, | ||
}; | ||
ctx.signal.addEventListener('abort', abortHandler, { once: true }); | ||
this.abortQueueMap.set(abortHandler, task); | ||
this.insertQueue(task); | ||
this.processQueue(); | ||
return waitP; | ||
}, true, Infinity, errors.ErrorAsyncLocksTimeout, ctx, [weight]); | ||
} | ||
async withF(...params) { | ||
withF(...params) { | ||
const f = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withF)([this.lock(timeout)], ([semaphore]) => f(semaphore)); | ||
return (0, resources_1.withF)([this.lock(...params)], ([semaphore]) => f(semaphore)); | ||
} | ||
withG(...params) { | ||
const g = params.pop(); | ||
const timeout = params[0]; | ||
return (0, resources_1.withG)([this.lock(timeout)], ([semaphore]) => g(semaphore)); | ||
return (0, resources_1.withG)([this.lock(...params)], ([semaphore]) => g(semaphore)); | ||
} | ||
insertQueue(task) { | ||
// If prioritising small weights, then perform insertion sort. | ||
// The resulting queue will be sorted from largest weights to smallest weights. | ||
if (this.priority) { | ||
let i = this.queue.length; | ||
while (i > 0 && this.queue[i - 1].weight < task.weight) { | ||
i--; | ||
} | ||
this.queue.splice(i, 0, task); | ||
} | ||
else { | ||
// Enqueuing into the queue is unfortunately not O(1). | ||
this.queue.unshift(task); | ||
} | ||
} | ||
processQueue() { | ||
while (this.queue.length > 0 && | ||
this.currentWeight + this.queue[this.queue.length - 1].weight <= | ||
this.limit) { | ||
const task = this.queue.pop(); | ||
if (!task.aborted) { | ||
task.task(); | ||
} | ||
} | ||
} | ||
} | ||
exports.default = Semaphore; | ||
//# sourceMappingURL=Semaphore.js.map |
@@ -1,37 +0,54 @@ | ||
import type { ResourceAcquire } from '@matrixai/resources'; | ||
import type { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
import type { ResourceRelease } from '@matrixai/resources'; | ||
import type { Timer } from '@matrixai/timer'; | ||
/** | ||
* Plain data dictionary | ||
*/ | ||
declare type POJO = { | ||
type POJO = { | ||
[key: string]: any; | ||
}; | ||
/** | ||
* Any type that can be turned into a string | ||
* Deconstructed promise | ||
*/ | ||
interface ToString { | ||
toString(): string; | ||
} | ||
type PromiseDeconstructed<T> = { | ||
p: Promise<T>; | ||
resolveP: (value: T | PromiseLike<T>) => void; | ||
rejectP: (reason?: any) => void; | ||
}; | ||
/** | ||
* Derived from `ResourceAcquire`, this is just cancellable too | ||
*/ | ||
type ResourceAcquireCancellable<Resource> = (resources?: readonly any[]) => PromiseCancellable<readonly [ResourceRelease, Resource?]>; | ||
interface Lockable { | ||
count: number; | ||
lock(...params: Array<unknown>): ResourceAcquire<Lockable>; | ||
lock(...params: Array<unknown>): ResourceAcquireCancellable<Lockable>; | ||
isLocked(...params: Array<unknown>): boolean; | ||
waitForUnlock(timeout?: number): Promise<void>; | ||
waitForUnlock(...params: Array<unknown>): PromiseCancellable<void>; | ||
withF<T>(...params: Array<unknown>): Promise<T>; | ||
withG<T, TReturn, TNext>(...params: Array<unknown>): AsyncGenerator<T, TReturn, TNext>; | ||
} | ||
declare type MultiLockRequest<L extends Lockable = Lockable> = [ | ||
key: ToString, | ||
type LockRequest<L extends Lockable = Lockable> = [ | ||
key: string, | ||
lockConstructor: new () => L, | ||
...lockingParams: Parameters<L['lock']> | ||
]; | ||
declare type MultiLockAcquire<L extends Lockable = Lockable> = [ | ||
key: ToString, | ||
lockAcquire: ResourceAcquire<L>, | ||
type LockAcquireCancellable<L extends Lockable = Lockable> = [ | ||
key: string, | ||
lockAcquire: ResourceAcquireCancellable<L>, | ||
...lockingParams: Parameters<L['lock']> | ||
]; | ||
declare type MultiLockAcquired<L extends Lockable = Lockable> = [ | ||
key: ToString, | ||
type LockAcquired<L extends Lockable = Lockable> = [ | ||
key: string, | ||
lock: L, | ||
...lockingParams: Parameters<L['lock']> | ||
]; | ||
export type { POJO, ToString, Lockable, MultiLockRequest, MultiLockAcquire, MultiLockAcquired, }; | ||
type RWLockRequest = [key: string, type?: 'read' | 'write', ctx?: Partial<ContextTimedInput>] | [key: string, ctx?: Partial<ContextTimedInput>]; | ||
type ContextTimed = { | ||
signal: AbortSignal; | ||
timer: Timer; | ||
}; | ||
type ContextTimedInput = { | ||
signal: AbortSignal; | ||
timer: Timer | number; | ||
}; | ||
export type { POJO, PromiseDeconstructed, ResourceAcquireCancellable, Lockable, LockRequest, LockAcquireCancellable, LockAcquired, RWLockRequest, ContextTimed, ContextTimedInput, }; |
@@ -1,3 +0,8 @@ | ||
declare function sleep(ms: number): Promise<void>; | ||
declare function yieldMicro(): Promise<void>; | ||
export { sleep, yieldMicro }; | ||
import type { PromiseDeconstructed, ContextTimed, ContextTimedInput } from './types'; | ||
import { PromiseCancellable } from '@matrixai/async-cancellable'; | ||
/** | ||
* Deconstructed promise | ||
*/ | ||
declare function promise<T = void>(): PromiseDeconstructed<T>; | ||
declare function setupTimedCancellable<C extends ContextTimed, P extends Array<any>, R>(f: (ctx: C, ...params: P) => PromiseLike<R>, lazy: boolean, delay: number, errorTimeoutConstructor: new () => Error, ctx: Partial<ContextTimedInput>, args: P): PromiseCancellable<R>; | ||
export { promise, setupTimedCancellable }; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.yieldMicro = exports.sleep = void 0; | ||
async function sleep(ms) { | ||
return await new Promise((r) => setTimeout(r, ms)); | ||
exports.setupTimedCancellable = exports.promise = void 0; | ||
const async_cancellable_1 = require("@matrixai/async-cancellable"); | ||
const timer_1 = require("@matrixai/timer"); | ||
/** | ||
* Deconstructed promise | ||
*/ | ||
function promise() { | ||
let resolveP, rejectP; | ||
const p = new Promise((resolve, reject) => { | ||
resolveP = resolve; | ||
rejectP = reject; | ||
}); | ||
return { | ||
p, | ||
resolveP, | ||
rejectP, | ||
}; | ||
} | ||
exports.sleep = sleep; | ||
async function yieldMicro() { | ||
return await new Promise((r) => queueMicrotask(r)); | ||
exports.promise = promise; | ||
function setupTimedCancellable(f, lazy, delay, errorTimeoutConstructor, ctx, args) { | ||
// There are 3 properties of timer and signal: | ||
// | ||
// A. If timer times out, signal is aborted | ||
// B. If signal is aborted, timer is cancelled | ||
// C. If timer is owned by the wrapper, then it must be cancelled when the target finishes | ||
// | ||
// There are 4 cases where the wrapper is used: | ||
// | ||
// 1. Nothing is inherited - A B C | ||
// 2. Signal is inherited - A B C | ||
// 3. Timer is inherited - A | ||
// 4. Both signal and timer are inherited - A* | ||
// | ||
// Property B and C only applies to case 1 and 2 because the timer is owned | ||
// by the wrapper and it is not inherited, if it is inherited, the caller may | ||
// need to reuse the timer. | ||
// In situation 4, there's a caveat for property A: it is assumed that the | ||
// caller has already setup the property A relationship, therefore this | ||
// wrapper will not re-setup this property A relationship. | ||
let abortController; | ||
let teardownContext; | ||
if ((ctx.timer === undefined || typeof ctx.timer === 'number') && | ||
ctx.signal === undefined) { | ||
abortController = new AbortController(); | ||
const e = new errorTimeoutConstructor(); | ||
// Property A | ||
const timer = new timer_1.Timer(() => void abortController.abort(e), ctx.timer ?? delay); | ||
abortController.signal.addEventListener('abort', () => { | ||
// Property B | ||
timer.cancel(); | ||
}); | ||
ctx.signal = abortController.signal; | ||
ctx.timer = timer; | ||
teardownContext = () => { | ||
// Property C | ||
timer.cancel(); | ||
}; | ||
} | ||
else if ((ctx.timer === undefined || typeof ctx.timer === 'number') && | ||
ctx.signal instanceof AbortSignal) { | ||
abortController = new AbortController(); | ||
const e = new errorTimeoutConstructor(); | ||
// Property A | ||
const timer = new timer_1.Timer(() => void abortController.abort(e), ctx.timer ?? delay); | ||
const signalUpstream = ctx.signal; | ||
const signalHandler = () => { | ||
// Property B | ||
timer.cancel(); | ||
abortController.abort(signalUpstream.reason); | ||
}; | ||
// If already aborted, abort target and cancel the timer | ||
if (signalUpstream.aborted) { | ||
// Property B | ||
timer.cancel(); | ||
abortController.abort(signalUpstream.reason); | ||
} | ||
else { | ||
signalUpstream.addEventListener('abort', signalHandler); | ||
} | ||
// Overwrite the signal property with this ctx's `AbortController.signal` | ||
ctx.signal = abortController.signal; | ||
ctx.timer = timer; | ||
teardownContext = () => { | ||
signalUpstream.removeEventListener('abort', signalHandler); | ||
// Property C | ||
timer.cancel(); | ||
}; | ||
} | ||
else if (ctx.timer instanceof timer_1.Timer && ctx.signal === undefined) { | ||
abortController = new AbortController(); | ||
const e = new errorTimeoutConstructor(); | ||
let finished = false; | ||
// If the timer resolves, then abort the target function | ||
void ctx.timer.then((r, s) => { | ||
// If the timer is aborted after it resolves | ||
// then don't bother aborting the target function | ||
if (!finished && !s.aborted) { | ||
// Property A | ||
abortController.abort(e); | ||
} | ||
return r; | ||
}, () => { | ||
// Ignore any upstream cancellation | ||
}); | ||
ctx.signal = abortController.signal; | ||
teardownContext = () => { | ||
// The timer is not cancelled here because | ||
// it was not created in this scope | ||
finished = true; | ||
}; | ||
} | ||
else { | ||
// In this case, `context.timer` and `context.signal` are both instances of | ||
// `Timer` and `AbortSignal` respectively | ||
// It is assumed that both the timer and signal are already hooked up to each other | ||
abortController = new AbortController(); | ||
const signalUpstream = ctx.signal; | ||
const signalHandler = () => { | ||
abortController.abort(signalUpstream.reason); | ||
}; | ||
if (signalUpstream.aborted) { | ||
abortController.abort(signalUpstream.reason); | ||
} | ||
else { | ||
signalUpstream.addEventListener('abort', signalHandler); | ||
} | ||
// Overwrite the signal property with this context's `AbortController.signal` | ||
ctx.signal = abortController.signal; | ||
teardownContext = () => { | ||
signalUpstream.removeEventListener('abort', signalHandler); | ||
}; | ||
} | ||
const result = f(ctx, ...args); | ||
// The `abortController` must be shared in the `finally` clause | ||
// to link up final promise's cancellation with the target | ||
// function's signal | ||
return new async_cancellable_1.PromiseCancellable((resolve, reject, signal) => { | ||
if (!lazy) { | ||
if (signal.aborted) { | ||
reject(signal.reason); | ||
} | ||
else { | ||
signal.addEventListener('abort', () => { | ||
reject(signal.reason); | ||
}, { once: true }); | ||
} | ||
} | ||
void result.then(resolve, reject); | ||
}, abortController).finally(() => { | ||
teardownContext(); | ||
}, abortController); | ||
} | ||
exports.yieldMicro = yieldMicro; | ||
exports.setupTimedCancellable = setupTimedCancellable; | ||
//# sourceMappingURL=utils.js.map |
{ | ||
"name": "@matrixai/async-locks", | ||
"version": "3.2.0", | ||
"version": "4.0.0", | ||
"author": "Roger Qiu", | ||
@@ -15,3 +15,3 @@ "description": "Asynchronous locking utilities", | ||
"prepare": "tsc -p ./tsconfig.build.json", | ||
"build": "rimraf ./dist && tsc -p ./tsconfig.build.json", | ||
"build": "shx rm -rf ./dist && tsc -p ./tsconfig.build.json", | ||
"postversion": "npm install --package-lock-only --ignore-scripts --silent", | ||
@@ -23,15 +23,17 @@ "ts-node": "ts-node", | ||
"lint-shell": "find ./src ./tests ./scripts -type f -regextype posix-extended -regex '.*\\.(sh)' -exec shellcheck {} +", | ||
"docs": "rimraf ./docs && typedoc --gitRevision master --tsconfig ./tsconfig.build.json --out ./docs src" | ||
"docs": "shx rm -rf ./docs && typedoc --gitRevision master --tsconfig ./tsconfig.build.json --out ./docs src" | ||
}, | ||
"dependencies": { | ||
"@matrixai/errors": "^1.1.3", | ||
"@matrixai/resources": "^1.1.4", | ||
"async-mutex": "^0.3.2" | ||
"@matrixai/async-cancellable": "^1.1.1", | ||
"@matrixai/errors": "^1.1.7", | ||
"@matrixai/resources": "^1.1.5", | ||
"@matrixai/timer": "^1.1.1" | ||
}, | ||
"devDependencies": { | ||
"@swc/core": "^1.2.215", | ||
"@swc/core": "^1.3.62", | ||
"@swc/jest": "^0.2.26", | ||
"@types/jest": "^28.1.3", | ||
"@types/node": "^16.11.7", | ||
"@typescript-eslint/eslint-plugin": "^5.23.0", | ||
"@typescript-eslint/parser": "^5.23.0", | ||
"@types/node": "^18.15.0", | ||
"@typescript-eslint/eslint-plugin": "^5.45.1", | ||
"@typescript-eslint/parser": "^5.45.1", | ||
"eslint": "^8.15.0", | ||
@@ -42,11 +44,12 @@ "eslint-config-prettier": "^8.5.0", | ||
"jest": "^28.1.1", | ||
"jest-extended": "^3.0.1", | ||
"jest-junit": "^14.0.0", | ||
"prettier": "^2.6.2", | ||
"rimraf": "^3.0.2", | ||
"shx": "^0.3.4", | ||
"ts-jest": "^28.0.5", | ||
"ts-node": "^10.9.1", | ||
"tsconfig-paths": "^3.9.0", | ||
"typedoc": "^0.22.15", | ||
"typescript": "^4.5.2" | ||
"typedoc": "^0.23.21", | ||
"typescript": "^4.9.3" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Debug access
Supply chain riskUses debug, reflection and dynamic code execution features.
Found 1 instance in 1 package
140501
36
2008
0
4
20
1
+ Added@matrixai/timer@^1.1.1
+ Added@matrixai/async-cancellable@1.1.1(transitive)
+ Added@matrixai/timer@1.1.3(transitive)
- Removedasync-mutex@^0.3.2
- Removedasync-mutex@0.3.2(transitive)
- Removedtslib@2.8.1(transitive)
Updated@matrixai/errors@^1.1.7
Updated@matrixai/resources@^1.1.5