redis-semaphore
Mutex and Semaphore implementations based on Redis ready for distributed systems
Features
- Fail-safe (all actions performed by LUA scripts (atomic))
Usage
Installation
npm install --save redis-semaphore ioredis
yarn add redis-semaphore ioredis
ioredis is the officially supported Redis client. This library's test code runs on it.
Users of other Redis clients should ensure ioredis-compatible API (see src/types.ts) when creating lock objects.
Mutex
See RedisLabs: Locks with timeouts
new Mutex(redisClient, key [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8, identifier = crypto.randomUUID() }])
redisClient
- required, configured redis
clientkey
- required, key for locking resource (final key in redis: mutex:<key>
)options
- optional
lockTimeout
- optional ms, time after mutex will be auto released (expired)acquireTimeout
- optional ms, max timeout for .acquire()
callacquireAttemptsLimit
- optional max number of attempts to be made in .acquire()
callretryInterval
- optional ms, time between acquire attempts if resource lockedrefreshInterval
- optional ms, auto-refresh interval; to disable auto-refresh behaviour set 0
identifier
- optional uuid, custom mutex identifier. Must be unique between parallel executors, otherwise multiple locks with same identifier can be treated as the same lock holder. Override only if you know what you are doing (see acquiredExternally
option).acquiredExternally
- optional true
, If identifier
provided and acquiredExternally
is true
then _refresh
will be used instead of _acquire
in .tryAcquire()
/.acquire()
. Useful for lock sharing between processes: acquire in scheduler, refresh and release in handler.onLockLost
- optional function, called when lock loss is detected due refresh cycle; default onLockLost throws unhandled LostLockError
Example
const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')
const redisClient = new Redis()
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource')
await mutex.acquire()
try {
} finally {
await mutex.release()
}
}
Example with lost lock handling
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource', {
onLockLost(err) {
console.error(err)
}
})
await mutex.acquire()
try {
while (mutex.isAcquired) {
}
} finally {
await mutex.release()
}
}
Example with optional lock
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource', {
acquireAttemptsLimit: 1
})
const lockAcquired = await mutex.tryAcquire()
if (!lockAcquired) {
return
}
try {
while (mutex.isAcquired) {
}
} finally {
await mutex.release()
}
}
Example with temporary refresh
async function doSomething() {
const mutex = new Mutex(redisClient, 'lockingResource', {
lockTimeout: 120000,
refreshInterval: 15000
})
const lockAcquired = await mutex.tryAcquire()
if (!lockAcquired) {
return
}
try {
} finally {
await mutex.stopRefresh()
}
}
Example with dynamically adjusting existing lock
const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')
const redisClient = new Redis()
const preMutex = new Mutex(redisClient, 'lockingResource', {
lockTimeout: 10 * 1e3,
refreshInterval: 0
});
const mutex = new Mutex(redisClient, 'lockingResource', {
identifier: preMutex.identifier,
acquiredExternally: true,
lockTimeout: 30 * 60 * 1e3,
refreshInterval: 60 * 1e3
});
Example with shared lock between scheduler and handler apps
const Mutex = require('redis-semaphore').Mutex
const Redis = require('ioredis')
const redisClient = new Redis()
async function every10MinutesCronScheduler() {
const mutex = new Mutex(redisClient, 'lockingResource', {
lockTimeout: 30 * 60 * 1e3,
refreshInterval: 0
})
if (await mutex.tryAcquire()) {
someQueue.publish({ mutexIdentifier: mutex.identifier })
} else {
logger.info('Job already scheduled. Do nothing in current cron cycle')
}
}
async function queueHandler(queueMessageData) {
const { mutexIdentifier } = queueMessageData
const mutex = new Mutex(redisClient, 'lockingResource', {
lockTimeout: 10 * 1e3,
identifier: mutexIdentifier,
acquiredExternally: true
})
await mutex.acquire()
try {
} finally {
await mutex.release()
}
}
Semaphore
See RedisLabs: Basic counting sempahore
This implementation is slightly different from the algorithm described in the book, but the main idea has not changed.
zrank
check replaced with zcard
, so now it is fair as RedisLabs: Fair semaphore (see tests).
In edge cases (node time difference is greater than lockTimeout
) both algorithms are not fair due cleanup stage (removing expired members from sorted set), so FairSemaphore
API has been removed (it's safe to replace it with Semaphore
).
Most reliable way to use: lockTimeout
is greater than possible node clock differences, refreshInterval
is not 0 and is less enough than lockTimeout
(by default is lockTimeout * 0.8
)
new Semaphore(redisClient, key, maxCount [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClient
- required, configured redis
clientkey
- required, key for locking resource (final key in redis: semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countoptions
optional See Mutex
options
Example
const Semaphore = require('redis-semaphore').Semaphore
const Redis = require('ioredis')
const redisClient = new Redis()
async function doSomething() {
const semaphore = new Semaphore(redisClient, 'lockingResource', 5)
await semaphore.acquire()
try {
} finally {
await semaphore.release()
}
}
MultiSemaphore
Same as Semaphore
with one difference - MultiSemaphore will try to acquire multiple permits instead of one.
MultiSemaphore
and Semaphore
shares same key namespace and can be used together (see test/src/RedisMultiSemaphore.test.ts).
new MultiSemaphore(redisClient, key, maxCount, permits [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClient
- required, configured redis
clientkey
- required, key for locking resource (final key in redis: semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countpermits
- required, number of acquiring permitsoptions
optional See Mutex
options
Example
const MultiSemaphore = require('redis-semaphore').MultiSemaphore
const Redis = require('ioredis')
const redisClient = new Redis()
async function doSomething() {
const semaphore = new MultiSemaphore(redisClient, 'lockingResource', 5, 2)
await semaphore.acquire()
try {
} finally {
await semaphore.release()
}
}
RedlockMutex
Distributed Mutex
version
See The Redlock algorithm
new RedlockMutex(redisClients, key [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClients
- required, array of configured redis
client connected to independent nodeskey
- required, key for locking resource (final key in redis: mutex:<key>
)options
optional See Mutex
options
Example
const RedlockMutex = require('redis-semaphore').RedlockMutex
const Redis = require('ioredis')
const redisClients = [
new Redis('127.0.0.1:6377'),
new Redis('127.0.0.1:6378'),
new Redis('127.0.0.1:6379')
]
async function doSomething() {
const mutex = new RedlockMutex(redisClients, 'lockingResource')
await mutex.acquire()
try {
} finally {
await mutex.release()
}
}
RedlockSemaphore
Distributed Semaphore
version
See The Redlock algorithm
new RedlockSemaphore(redisClients, key, maxCount [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClients
- required, array of configured redis
client connected to independent nodeskey
- required, key for locking resource (final key in redis: semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countoptions
optional See Mutex
options
Example
const RedlockSemaphore = require('redis-semaphore').RedlockSemaphore
const Redis = require('ioredis')
const redisClients = [
new Redis('127.0.0.1:6377'),
new Redis('127.0.0.1:6378'),
new Redis('127.0.0.1:6379')
]
async function doSomething() {
const semaphore = new Semaphore(redisClients, 'lockingResource', 5)
await semaphore.acquire()
try {
} finally {
await semaphore.release()
}
}
RedlockMultiSemaphore
Distributed MultiSemaphore
version
See The Redlock algorithm
new RedlockMultiSemaphore(redisClients, key, maxCount, permits [, { lockTimeout = 10000, acquireTimeout = 10000, acquireAttemptsLimit = Number.POSITIVE_INFINITY, retryInterval = 10, refreshInterval = lockTimeout * 0.8 }])
redisClients
- required, array of configured redis
client connected to independent nodeskey
- required, key for locking resource (final key in redis: semaphore:<key>
)maxCount
- required, maximum simultaneously resource usage countpermits
- required, number of acquiring permitsoptions
optional See Mutex
options
Example
const RedlockMultiSemaphore = require('redis-semaphore').RedlockMultiSemaphore
const Redis = require('ioredis')
const redisClients = [
new Redis('127.0.0.1:6377'),
new Redis('127.0.0.1:6378'),
new Redis('127.0.0.1:6379')
]
async function doSomething() {
const semaphore = new RedlockMultiSemaphore(
redisClients,
'lockingResource',
5,
2
)
await semaphore.acquire()
try {
} finally {
await semaphore.release()
}
}
Development
yarn --immutable
./setup-redis-servers.sh
yarn dev
License
MIT