advisory-lock
Advanced tools
Comparing version 1.2.0 to 2.0.0
@@ -1,11 +0,15 @@ | ||
import pg from "pg"; | ||
export declare const strToKey: (name: string) => AdvisoryKey; | ||
type AdvisoryKey = [number, number]; | ||
interface FunctionObject { | ||
[key: string]: (...args: any[]) => any; | ||
interface CreateMutexFunction { | ||
(lockName: string): AdvisoryLock; | ||
} | ||
declare const _default: (conString: string) => { | ||
(name: string): FunctionObject; | ||
client: pg.Client; | ||
}; | ||
type WithLockFunction = (fn: () => Promise<unknown>) => Promise<unknown>; | ||
type UnlockFn = () => Promise<void>; | ||
interface AdvisoryLock { | ||
lock: () => Promise<UnlockFn>; | ||
unlock: UnlockFn; | ||
tryLock: () => Promise<UnlockFn | undefined>; | ||
withLock: WithLockFunction; | ||
} | ||
declare const _default: (conString: string) => CreateMutexFunction; | ||
export default _default; |
"use strict"; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
var __generator = (this && this.__generator) || function (thisArg, body) { | ||
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; | ||
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; | ||
function verb(n) { return function (v) { return step([n, v]); }; } | ||
function step(op) { | ||
if (f) throw new TypeError("Generator is already executing."); | ||
while (g && (g = 0, op[0] && (_ = 0)), _) try { | ||
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; | ||
if (y = 0, t) op = [op[0] & 2, t.value]; | ||
switch (op[0]) { | ||
case 0: case 1: t = op; break; | ||
case 4: _.label++; return { value: op[1], done: false }; | ||
case 5: _.label++; y = op[1]; op = [0]; continue; | ||
case 7: op = _.ops.pop(); _.trys.pop(); continue; | ||
default: | ||
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } | ||
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } | ||
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } | ||
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } | ||
if (t[2]) _.ops.pop(); | ||
_.trys.pop(); continue; | ||
} | ||
op = body.call(thisArg, _); | ||
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } | ||
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; | ||
} | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
@@ -24,133 +60,176 @@ return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
exports.strToKey = strToKey; | ||
// Patches client so that unref works as expected... Node terminates | ||
// only if there are not pending queries | ||
var patchClient = function (client) { | ||
var connect = client.connect.bind(client); | ||
var query = client.query.bind(client); | ||
var refCount = 0; | ||
var ref = function () { | ||
refCount++; | ||
/* @ts-ignore */ | ||
client.connection.stream.ref(); | ||
}; | ||
var unref = function () { | ||
refCount--; | ||
/* @ts-ignore */ | ||
if (!refCount) | ||
client.connection.stream.unref(); | ||
}; | ||
var wrap = function (fn) { | ||
return function () { | ||
var args = []; | ||
for (var _i = 0; _i < arguments.length; _i++) { | ||
args[_i] = arguments[_i]; | ||
} | ||
ref(); | ||
var lastArg = args[args.length - 1]; | ||
var lastArgIsCb = typeof lastArg === "function"; | ||
var outerCb = lastArgIsCb ? lastArg : noop; | ||
if (lastArgIsCb) | ||
args.pop(); | ||
var cb = function () { | ||
var cbArgs = []; | ||
for (var _i = 0; _i < arguments.length; _i++) { | ||
cbArgs[_i] = arguments[_i]; | ||
} | ||
unref(); | ||
outerCb.apply(void 0, cbArgs); | ||
}; | ||
/* @ts-ignore */ | ||
args.push(cb); | ||
return fn.apply(void 0, args); | ||
}; | ||
}; | ||
client.connect = wrap(connect); | ||
client.query = wrap(query); | ||
return client; | ||
}; | ||
var query = function (client, lockFn, _a) { | ||
function query(client, lockFn, _a) { | ||
var key1 = _a[0], key2 = _a[1]; | ||
return new Promise(function (resolve, reject) { | ||
var sql = "SELECT ".concat(lockFn, "(").concat(key1, ", ").concat(key2, ")"); | ||
debug("query: ".concat(sql)); | ||
client.query(sql, function (err, result) { | ||
if (err) { | ||
debug(err); | ||
return reject(err); | ||
return __awaiter(this, void 0, void 0, function () { | ||
var sql, result; | ||
return __generator(this, function (_b) { | ||
switch (_b.label) { | ||
case 0: | ||
sql = "SELECT ".concat(lockFn, "(").concat(key1, ", ").concat(key2, ")"); | ||
debug("query: ".concat(sql)); | ||
return [4 /*yield*/, client.query(sql)]; | ||
case 1: | ||
result = _b.sent(); | ||
return [2 /*return*/, result.rows[0][lockFn]]; | ||
} | ||
resolve(result.rows[0][lockFn]); | ||
}); | ||
}); | ||
}; | ||
// Pauses promise chain until pg client is connected | ||
var initWaitForConnection = function (client) { | ||
var queue = []; | ||
var waitForConnect = true; | ||
debug("connecting"); | ||
client.connect(function (err) { | ||
waitForConnect = false; | ||
if (err) { | ||
debug("connection error"); | ||
debug(err); | ||
queue.forEach(function (_a) { | ||
var reject = _a[1]; | ||
return reject(err); | ||
} | ||
exports["default"] = (function (conString) { | ||
debug("connection string: ".concat(conString)); | ||
var createMutex = function (name) { | ||
var key = typeof name === "string" ? (0, exports.strToKey)(name) : name; | ||
function newClient() { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var client; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
client = new pg_1["default"].Client({ | ||
connectionString: conString | ||
}); | ||
return [4 /*yield*/, client.connect()]; | ||
case 1: | ||
_a.sent(); | ||
return [2 /*return*/, client]; | ||
} | ||
}); | ||
}); | ||
} | ||
else { | ||
debug("connected"); | ||
queue.forEach(function (_a) { | ||
var resolve = _a[0]; | ||
return resolve(); | ||
// for backwards compatibility... | ||
var cachedUnlock; | ||
function unlock() { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
if (cachedUnlock) { | ||
return [2 /*return*/, cachedUnlock()]; | ||
} | ||
return [2 /*return*/]; | ||
}); | ||
}); | ||
} | ||
}); | ||
return function () { | ||
return new Promise(function (resolve, reject) { | ||
if (!waitForConnect) | ||
return resolve(); | ||
debug("waiting for connection"); | ||
queue.push([resolve, reject]); | ||
}); | ||
}; | ||
}; | ||
exports["default"] = (function (conString) { | ||
debug("connection string: ".concat(conString)); | ||
var client = patchClient(new pg_1["default"].Client(conString)); | ||
var waitForConnection = initWaitForConnection(client); | ||
// TODO: client.connection.stream.unref()? | ||
var createMutex = function (name) { | ||
var key = typeof name === "string" ? (0, exports.strToKey)(name) : name; | ||
var lock = function () { return query(client, "pg_advisory_lock", key); }; | ||
var unlock = function () { return query(client, "pg_advisory_unlock", key); }; | ||
var tryLock = function () { return query(client, "pg_try_advisory_lock", key); }; | ||
// lock and unlock share a client because the lock is tied to a connection | ||
function lock() { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var client, unlockFn, err_1; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, newClient()]; | ||
case 1: | ||
client = _a.sent(); | ||
_a.label = 2; | ||
case 2: | ||
_a.trys.push([2, 4, , 5]); | ||
return [4 /*yield*/, query(client, "pg_advisory_lock", key)]; | ||
case 3: | ||
_a.sent(); | ||
unlockFn = function unlock() { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
_a.trys.push([0, , 2, 3]); | ||
return [4 /*yield*/, query(client, "pg_advisory_unlock", key)]; | ||
case 1: | ||
_a.sent(); | ||
return [3 /*break*/, 3]; | ||
case 2: | ||
client.end(); | ||
return [7 /*endfinally*/]; | ||
case 3: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
cachedUnlock = unlockFn; | ||
return [2 /*return*/, unlockFn]; | ||
case 4: | ||
err_1 = _a.sent(); | ||
client.end(); | ||
throw err_1; | ||
case 5: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
} | ||
function tryLock() { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var client, obtained, unlockFn, err_2; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, newClient()]; | ||
case 1: | ||
client = _a.sent(); | ||
_a.label = 2; | ||
case 2: | ||
_a.trys.push([2, 4, , 5]); | ||
return [4 /*yield*/, query(client, "pg_try_advisory_lock", key)]; | ||
case 3: | ||
obtained = _a.sent(); | ||
if (obtained) { | ||
unlockFn = function unlock() { | ||
return __awaiter(this, void 0, void 0, function () { | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
_a.trys.push([0, , 2, 3]); | ||
return [4 /*yield*/, query(client, "pg_advisory_unlock", key)]; | ||
case 1: | ||
_a.sent(); | ||
return [3 /*break*/, 3]; | ||
case 2: | ||
client.end(); | ||
return [7 /*endfinally*/]; | ||
case 3: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
cachedUnlock = unlockFn; | ||
return [2 /*return*/, unlockFn]; | ||
} | ||
else { | ||
client.end(); | ||
} | ||
return [3 /*break*/, 5]; | ||
case 4: | ||
err_2 = _a.sent(); | ||
client.end(); | ||
throw err_2; | ||
case 5: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
} | ||
// TODO: catch db disconnection errors? | ||
var withLock = function (fn) { | ||
return lock().then(function () { | ||
return Promise.resolve() | ||
.then(fn) | ||
.then(function (res) { return unlock().then(function () { return res; }); }, function (err) { | ||
return unlock().then(function () { | ||
throw err; | ||
}); | ||
var _this = this; | ||
if (fn === void 0) { fn = function () { return __awaiter(_this, void 0, void 0, function () { return __generator(this, function (_a) { | ||
return [2 /*return*/]; | ||
}); }); }; } | ||
return __awaiter(this, void 0, void 0, function () { | ||
var unlock; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, lock()]; | ||
case 1: | ||
unlock = _a.sent(); | ||
_a.label = 2; | ||
case 2: | ||
_a.trys.push([2, , 4, 6]); | ||
return [4 /*yield*/, fn()]; | ||
case 3: return [2 /*return*/, _a.sent()]; | ||
case 4: return [4 /*yield*/, unlock()]; | ||
case 5: | ||
_a.sent(); | ||
return [7 /*endfinally*/]; | ||
case 6: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
var fns = { lock: lock, unlock: unlock, tryLock: tryLock, withLock: withLock }; | ||
// "Block" function calls until client is connected | ||
var guardedFns = {}; | ||
Object.keys(fns).forEach(function (fnName) { | ||
guardedFns[fnName] = function () { | ||
var args = []; | ||
for (var _i = 0; _i < arguments.length; _i++) { | ||
args[_i] = arguments[_i]; | ||
} | ||
return waitForConnection().then(function () { return fns[fnName].apply(fns, args); }); | ||
}; | ||
}); | ||
return guardedFns; | ||
return { lock: lock, unlock: unlock, tryLock: tryLock, withLock: withLock }; | ||
}; | ||
createMutex.client = client; | ||
return createMutex; | ||
}); | ||
//# sourceMappingURL=index.js.map |
@@ -25,2 +25,38 @@ "use strict"; | ||
}; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
var __generator = (this && this.__generator) || function (thisArg, body) { | ||
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; | ||
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; | ||
function verb(n) { return function (v) { return step([n, v]); }; } | ||
function step(op) { | ||
if (f) throw new TypeError("Generator is already executing."); | ||
while (g && (g = 0, op[0] && (_ = 0)), _) try { | ||
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; | ||
if (y = 0, t) op = [op[0] & 2, t.value]; | ||
switch (op[0]) { | ||
case 0: case 1: t = op; break; | ||
case 4: _.label++; return { value: op[1], done: false }; | ||
case 5: _.label++; y = op[1]; op = [0]; continue; | ||
case 7: op = _.ops.pop(); _.trys.pop(); continue; | ||
default: | ||
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } | ||
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } | ||
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } | ||
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } | ||
if (t[2]) _.ops.pop(); | ||
_.trys.pop(); continue; | ||
} | ||
op = body.call(thisArg, _); | ||
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } | ||
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; | ||
} | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
@@ -38,3 +74,3 @@ return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}); | ||
(0, tape_1["default"])("lock/unlock on same connection", function (t) { | ||
(0, tape_1["default"])("lock/unlock", function (t) { | ||
t.plan(3); | ||
@@ -44,16 +80,32 @@ var getMutex = (0, __1["default"])(common_1.conString); | ||
var i = 0; | ||
var testLockUnlock = function (iVal) { | ||
return lock() | ||
.then(function () { | ||
t.equal(i, iVal, "i is equal to 0"); | ||
i++; | ||
// wait 300ms before decrementing i | ||
return (0, common_1.timeout)(300); | ||
}) | ||
.then(function () { return i--; }) | ||
.then(unlock)["catch"](t.fail); | ||
}; | ||
testLockUnlock(0); | ||
testLockUnlock(1); | ||
testLockUnlock(2); | ||
var testLockUnlock = function () { return __awaiter(void 0, void 0, void 0, function () { | ||
var err_1; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
_a.trys.push([0, 4, , 5]); | ||
return [4 /*yield*/, lock()]; | ||
case 1: | ||
_a.sent(); | ||
t.equal(i, 0, "".concat(i, " is equal to 0")); | ||
i++; | ||
return [4 /*yield*/, (0, common_1.timeout)(300)]; | ||
case 2: | ||
_a.sent(); | ||
i--; | ||
return [4 /*yield*/, unlock()]; | ||
case 3: | ||
_a.sent(); | ||
return [3 /*break*/, 5]; | ||
case 4: | ||
err_1 = _a.sent(); | ||
t.fail(err_1); | ||
return [3 /*break*/, 5]; | ||
case 5: return [2 /*return*/]; | ||
} | ||
}); | ||
}); }; | ||
testLockUnlock(); | ||
testLockUnlock(); | ||
testLockUnlock(); | ||
// we can acquire lock both times because we're using the same connection | ||
@@ -84,42 +136,74 @@ }); | ||
}); | ||
(0, tape_1["default"])("tryLock", function (t) { | ||
var mutex1 = (0, __1["default"])(common_1.conString)("test-try-lock"); | ||
var mutex2 = (0, __1["default"])(common_1.conString)("test-try-lock"); | ||
mutex1 | ||
.tryLock() | ||
.then(function (obtained) { | ||
t.equal(obtained, true); | ||
}) | ||
.then(function () { return mutex2.tryLock(); }) | ||
.then(function (obtained) { | ||
t.equal(obtained, false); | ||
}) | ||
.then(function () { return mutex1.unlock(); }) | ||
.then(function () { return mutex2.tryLock(); }) | ||
.then(function (obtained) { | ||
t.equal(obtained, true); | ||
}) | ||
.then(function () { return mutex1.tryLock(); }) | ||
.then(function (obtained) { | ||
t.equal(obtained, false); | ||
}) | ||
.then(function () { return mutex2.unlock(); }) | ||
.then(function () { return t.end(); })["catch"](t.fail); | ||
}); | ||
(0, tape_1["default"])("withLock followed by tryLock", function (t) { | ||
var mutex1 = (0, __1["default"])(common_1.conString)("test-withlock-lock"); | ||
var mutex2 = (0, __1["default"])(common_1.conString)("test-withlock-lock"); | ||
mutex1 | ||
.withLock(function () { | ||
return mutex2 | ||
.tryLock() | ||
.then(function (obtained) { return t.equal(obtained, false); }) | ||
.then(function () { return "someval"; }); | ||
}) | ||
.then(function (res) { return t.equal(res, "someval"); }) | ||
.then(function () { return mutex2.tryLock(); }) | ||
.then(function (obtained) { return t.equal(obtained, true); }) | ||
.then(function () { return mutex2.unlock(); }) | ||
.then(function () { return t.end(); })["catch"](t.fail); | ||
}); | ||
(0, tape_1["default"])("tryLock", function (t) { return __awaiter(void 0, void 0, void 0, function () { | ||
function assertAcquired(val) { | ||
t.equal(typeof val, "function", "acquired"); | ||
} | ||
function assertNotAcquired(val) { | ||
t.equal(typeof val, "undefined", "not acquired"); | ||
} | ||
var mutex1, mutex2, _a, _b, _c, _d; | ||
return __generator(this, function (_e) { | ||
switch (_e.label) { | ||
case 0: | ||
mutex1 = (0, __1["default"])(common_1.conString)("test-try-lock"); | ||
mutex2 = (0, __1["default"])(common_1.conString)("test-try-lock"); | ||
_a = assertAcquired; | ||
return [4 /*yield*/, mutex1.tryLock()]; | ||
case 1: | ||
_a.apply(void 0, [_e.sent()]); | ||
_b = assertNotAcquired; | ||
return [4 /*yield*/, mutex2.tryLock()]; | ||
case 2: | ||
_b.apply(void 0, [_e.sent()]); | ||
return [4 /*yield*/, mutex1.unlock()]; | ||
case 3: | ||
_e.sent(); | ||
_c = assertAcquired; | ||
return [4 /*yield*/, mutex2.tryLock()]; | ||
case 4: | ||
_c.apply(void 0, [_e.sent()]); | ||
_d = assertNotAcquired; | ||
return [4 /*yield*/, mutex1.tryLock()]; | ||
case 5: | ||
_d.apply(void 0, [_e.sent()]); | ||
return [4 /*yield*/, mutex2.unlock()]; | ||
case 6: | ||
_e.sent(); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); }); | ||
(0, tape_1["default"])("withLock followed by tryLock", function (t) { return __awaiter(void 0, void 0, void 0, function () { | ||
var mutex1, mutex2, val, unlock; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
mutex1 = (0, __1["default"])(common_1.conString)("test-withlock-lock"); | ||
mutex2 = (0, __1["default"])(common_1.conString)("test-withlock-lock"); | ||
return [4 /*yield*/, mutex1.withLock(function () { return __awaiter(void 0, void 0, void 0, function () { | ||
var unlock; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, mutex2.tryLock()]; | ||
case 1: | ||
unlock = _a.sent(); | ||
t.equal(typeof unlock, "undefined"); | ||
return [2 /*return*/, "someval"]; | ||
} | ||
}); | ||
}); })]; | ||
case 1: | ||
val = _a.sent(); | ||
t.equal(val, "someval"); | ||
return [4 /*yield*/, mutex2.tryLock()]; | ||
case 2: | ||
unlock = _a.sent(); | ||
t.equal(typeof unlock, "function"); | ||
return [4 /*yield*/, unlock()]; | ||
case 3: | ||
_a.sent(); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); }); | ||
(0, tape_1["default"])("withLock - no promise", function (t) { | ||
@@ -126,0 +210,0 @@ var mutex1 = (0, __1["default"])(common_1.conString)("test-withlock-lock"); |
"use strict"; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
var __generator = (this && this.__generator) || function (thisArg, body) { | ||
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; | ||
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; | ||
function verb(n) { return function (v) { return step([n, v]); }; } | ||
function step(op) { | ||
if (f) throw new TypeError("Generator is already executing."); | ||
while (g && (g = 0, op[0] && (_ = 0)), _) try { | ||
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; | ||
if (y = 0, t) op = [op[0] & 2, t.value]; | ||
switch (op[0]) { | ||
case 0: case 1: t = op; break; | ||
case 4: _.label++; return { value: op[1], done: false }; | ||
case 5: _.label++; y = op[1]; op = [0]; continue; | ||
case 7: op = _.ops.pop(); _.trys.pop(); continue; | ||
default: | ||
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } | ||
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } | ||
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } | ||
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } | ||
if (t[2]) _.ops.pop(); | ||
_.trys.pop(); continue; | ||
} | ||
op = body.call(thisArg, _); | ||
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } | ||
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; | ||
} | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
@@ -11,39 +47,60 @@ return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
// Returns the number of active connections to the database | ||
var getActiveConnections = function () { | ||
return new Promise(function (resolve, reject) { | ||
var sql = "SELECT count(*) FROM pg_stat_activity"; | ||
var client = new pg_1["default"].Client(common_1.conString); | ||
client.connect(function (connectError) { | ||
if (connectError) | ||
return reject(connectError); | ||
client.query(sql, function (queryError, result) { | ||
if (queryError) | ||
return reject(queryError); | ||
resolve(Number(result.rows[0].count)); | ||
client.end(); | ||
}); | ||
function getActiveConnections() { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var client, sql, result; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
client = new pg_1["default"].Client(common_1.conString); | ||
_a.label = 1; | ||
case 1: | ||
_a.trys.push([1, , 4, 5]); | ||
return [4 /*yield*/, client.connect()]; | ||
case 2: | ||
_a.sent(); | ||
sql = "SELECT count(*) FROM pg_stat_activity"; | ||
return [4 /*yield*/, client.query(sql)]; | ||
case 3: | ||
result = _a.sent(); | ||
return [2 /*return*/, Number(result.rows[0].count)]; | ||
case 4: | ||
client.end(); | ||
return [7 /*endfinally*/]; | ||
case 5: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
(0, tape_1["default"])("withLock releases connection after unlocking", function (t) { | ||
getActiveConnections().then(function (startConnectionCount) { | ||
var _loop_1 = function (i) { | ||
var createMutex = (0, __1["default"])(common_1.conString); | ||
createMutex("test-withlock-release") | ||
.withLock()["catch"](t.fail) | ||
.then(function () { | ||
createMutex.client.end(); | ||
}); | ||
}; | ||
for (var i = 0; i < 25; i++) { | ||
_loop_1(i); | ||
} | ||
(0, tape_1["default"])("withLock releases connection after unlocking", function (t) { return __awaiter(void 0, void 0, void 0, function () { | ||
var startConnectionCount, i, createMutex, connectionCount; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, getActiveConnections()]; | ||
case 1: | ||
startConnectionCount = _a.sent(); | ||
i = 0; | ||
_a.label = 2; | ||
case 2: | ||
if (!(i < 25)) return [3 /*break*/, 5]; | ||
createMutex = (0, __1["default"])(common_1.conString); | ||
return [4 /*yield*/, createMutex("test-withlock-release").withLock(function () { | ||
// do something | ||
})]; | ||
case 3: | ||
_a.sent(); | ||
_a.label = 4; | ||
case 4: | ||
i++; | ||
return [3 /*break*/, 2]; | ||
case 5: return [4 /*yield*/, (0, common_1.timeout)(1000)]; | ||
case 6: | ||
_a.sent(); | ||
return [4 /*yield*/, getActiveConnections()]; | ||
case 7: | ||
connectionCount = _a.sent(); | ||
t.equal(connectionCount, startConnectionCount); | ||
return [2 /*return*/]; | ||
} | ||
(0, common_1.timeout)(500).then(function () { | ||
getActiveConnections().then(function (connectionCount) { | ||
t.equal(connectionCount, startConnectionCount); | ||
t.end(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); }); | ||
//# sourceMappingURL=issue-1.test.js.map |
{ | ||
"name": "advisory-lock", | ||
"version": "1.2.0", | ||
"version": "2.0.0", | ||
"description": "Distributed locking using PostgreSQL advisory locks", | ||
@@ -13,2 +13,3 @@ "main": "build/index.js", | ||
"lint": "prettier -c src/", | ||
"lint:fix": "prettier --write src/", | ||
"build": "tsc", | ||
@@ -46,3 +47,3 @@ "release": "npm run build && npm publish && git push && git push --tags", | ||
"prettier": "^2.8.4", | ||
"tape": "^4.5.1", | ||
"tape": "^5.6.3", | ||
"typescript": "^4.9.5" | ||
@@ -49,0 +50,0 @@ }, |
108
README.md
@@ -41,2 +41,29 @@ # advisory-lock | ||
## Example | ||
```javascript | ||
import advisoryLock from "advisory-lock"; | ||
const mutex = advisoryLock("postgres://user:pass@localhost:3475/dbname")( | ||
"some-lock-name" | ||
); | ||
// waits and blocks indefinitely for the lock before executing the function | ||
await mutex.withLock(async () => { | ||
// do something exclusive | ||
// releases lock when promise resolves or rejects | ||
}); | ||
// doesn't "block", just tells us if the lock is available | ||
const unlock = await mutex.tryLock(); | ||
if (unlock) { | ||
// we are now responsible for manually releasing the lock | ||
// do something... | ||
await unlock(); | ||
} else { | ||
throw new Error("could not acquire lock"); | ||
} | ||
``` | ||
See [./test](./test) for more usage examples. | ||
## CLI Usage | ||
@@ -74,12 +101,2 @@ | ||
The `createMutex` function also exposes a `client` property | ||
that can be used to terminate the database connection if necessary. | ||
PS: Each call to `advisoryLock(connectionString)` creates a new PostgreSQL | ||
connection which is not automatically terminated, so if that is an | ||
[issue for you](https://github.com/olalonde/advisory-lock/issues/1), you | ||
can use `createMutex.client.end()` to end the connection when | ||
appropriate (e.g. after releasing a lock). This is however typically | ||
not needed since usually, `advisoryLock()` only needs to be called once. | ||
### createMutex(lockName) | ||
@@ -99,71 +116,14 @@ | ||
- `fn` Promise returning function or regular function to be executed once the lock is acquired | ||
- `fn` Function to be executed once the lock is acquired. | ||
Like `lock()` but automatically release the lock after `fn()` resolves. | ||
Like `lock()` but automatically release the lock after `fn()` is executed. | ||
Returns a promise which resolves to the value `fn` resolves to. | ||
Returns the value returned by `fn()`. | ||
Throws an error if the Postgres connection closes unexpectedly. | ||
#### mutex.tryLock(): UnlockFunction | ||
#### mutex.tryLock() | ||
Returns an `unlock()` function if the lock was acquired and `undefined` otherwise. | ||
Returns a promise which resolves to `true` if the lock is free and | ||
`false` if the lock is taken. Doesn't "block". | ||
#### mutex.lock(): UnlockFunction | ||
#### mutex.lock() | ||
Wait until we get exclusive lock. | ||
#### mutex.unlock() | ||
Release the exclusive lock. | ||
#### mutex.tryLockShared() | ||
Like `tryLock()` but for shared lock. | ||
#### mutex.lockShared() | ||
While held, this blocks any attempt to obtain an exclusive lock. (e.g.: calls to `.lock()` or `.withLock()`) | ||
#### mutex.unlockShared() | ||
Release shared lock. | ||
#### mutex.withLockShared(fn) | ||
Same as `withLock()` but using a shared lock. | ||
## Example | ||
```javascript | ||
import advisoryLock from "advisory-lock"; | ||
const mutex = advisoryLock("postgres://user:pass@localhost:3475/dbname")( | ||
"some-lock-name" | ||
); | ||
const doSomething = () => { | ||
// doSomething | ||
return Promise.resolve(); | ||
}; | ||
mutex | ||
.withLock(doSomething) // "blocks" until lock is free | ||
.catch((err) => { | ||
// this gets executed if the postgres connection closes unexpectedly, etc. | ||
}) | ||
.then(() => { | ||
// lock is released now... | ||
}); | ||
// doesn't "block" | ||
mutex.tryLock().then((obtainedLock) => { | ||
if (obtainedLock) { | ||
return doSomething().then(() => mutex.unlock()); | ||
} else { | ||
throw new Error("failed to obtain lock"); | ||
} | ||
}); | ||
``` | ||
See [./test](./test) for more usage examples. | ||
Blocks and waits for lock acquisition and returns an `unlock()` function. |
202
src/index.ts
@@ -1,2 +0,2 @@ | ||
import pg from "pg"; | ||
import pg, { Pool } from "pg"; | ||
import { createHash } from "crypto"; | ||
@@ -20,125 +20,113 @@ import initDebug from "debug"; | ||
// Patches client so that unref works as expected... Node terminates | ||
// only if there are not pending queries | ||
const patchClient = (client: pg.Client) => { | ||
const connect = client.connect.bind(client); | ||
const query = client.query.bind(client); | ||
let refCount = 0; | ||
// TODO: fix unref? | ||
const ref = () => { | ||
refCount++; | ||
/* @ts-ignore */ | ||
client.connection.stream.ref(); | ||
}; | ||
const unref = () => { | ||
refCount--; | ||
/* @ts-ignore */ | ||
if (!refCount) client.connection.stream.unref(); | ||
}; | ||
type AdvisoryKey = [number, number]; | ||
const wrap = | ||
(fn: Function) => | ||
(...args: []) => { | ||
ref(); | ||
const lastArg = args[args.length - 1]; | ||
const lastArgIsCb = typeof lastArg === "function"; | ||
const outerCb = lastArgIsCb ? lastArg : noop; | ||
if (lastArgIsCb) args.pop(); | ||
const cb = (...cbArgs: []) => { | ||
unref(); | ||
outerCb(...cbArgs); | ||
}; | ||
/* @ts-ignore */ | ||
args.push(cb); | ||
return fn(...args); | ||
}; | ||
async function query( | ||
client: pg.Client, | ||
lockFn: string, | ||
[key1, key2]: AdvisoryKey | ||
): Promise<boolean> { | ||
const sql = `SELECT ${lockFn}(${key1}, ${key2})`; | ||
debug(`query: ${sql}`); | ||
const result = await client.query(sql); | ||
return result.rows[0][lockFn] as boolean; | ||
} | ||
client.connect = wrap(connect); | ||
client.query = wrap(query); | ||
return client; | ||
}; | ||
interface CreateMutexFunction { | ||
(lockName: string): AdvisoryLock; | ||
} | ||
type AdvisoryKey = [number, number]; | ||
type WithLockFunction = (fn: () => Promise<unknown>) => Promise<unknown>; | ||
const query = (client: pg.Client, lockFn: string, [key1, key2]: AdvisoryKey) => | ||
new Promise((resolve, reject) => { | ||
const sql = `SELECT ${lockFn}(${key1}, ${key2})`; | ||
debug(`query: ${sql}`); | ||
client.query(sql, (err, result) => { | ||
if (err) { | ||
debug(err); | ||
return reject(err); | ||
} | ||
resolve(result.rows[0][lockFn]); | ||
}); | ||
}); | ||
type UnlockFn = () => Promise<void>; | ||
// Pauses promise chain until pg client is connected | ||
const initWaitForConnection = (client: pg.Client) => { | ||
const queue: [(value?: any) => void, (err: any) => void][] = []; | ||
let waitForConnect = true; | ||
debug("connecting"); | ||
client.connect((err) => { | ||
waitForConnect = false; | ||
if (err) { | ||
debug("connection error"); | ||
debug(err); | ||
queue.forEach(([, reject]) => reject(err)); | ||
} else { | ||
debug("connected"); | ||
queue.forEach(([resolve]) => resolve()); | ||
} | ||
}); | ||
return () => | ||
new Promise<void>((resolve, reject) => { | ||
if (!waitForConnect) return resolve(); | ||
debug("waiting for connection"); | ||
queue.push([resolve, reject]); | ||
}); | ||
}; | ||
interface FunctionObject { | ||
[key: string]: (...args: any[]) => any; | ||
interface AdvisoryLock { | ||
lock: () => Promise<UnlockFn>; | ||
unlock: UnlockFn; | ||
tryLock: () => Promise<UnlockFn | undefined>; | ||
withLock: WithLockFunction; | ||
} | ||
export default (conString: string) => { | ||
export default (conString: string): CreateMutexFunction => { | ||
debug(`connection string: ${conString}`); | ||
const client = patchClient(new pg.Client(conString)); | ||
const waitForConnection = initWaitForConnection(client); | ||
// TODO: client.connection.stream.unref()? | ||
const createMutex = (name: string) => { | ||
const createMutex: CreateMutexFunction = (name: string) => { | ||
const key = typeof name === "string" ? strToKey(name) : name; | ||
const lock = () => query(client, "pg_advisory_lock", key); | ||
const unlock = () => query(client, "pg_advisory_unlock", key); | ||
const tryLock = () => query(client, "pg_try_advisory_lock", key); | ||
async function newClient(): Promise<pg.Client> { | ||
const client = new pg.Client({ | ||
connectionString: conString, | ||
}); | ||
await client.connect(); | ||
return client; | ||
} | ||
// for backwards compatibility... | ||
let cachedUnlock: undefined | UnlockFn; | ||
async function unlock() { | ||
if (cachedUnlock) { | ||
return cachedUnlock(); | ||
} | ||
// no op | ||
} | ||
// lock and unlock share a client because the lock is tied to a connection | ||
async function lock(): Promise<UnlockFn> { | ||
const client = await newClient(); | ||
try { | ||
await query(client, "pg_advisory_lock", key); | ||
// For backwards compatibility we assign it to unlock | ||
const unlockFn = async function unlock() { | ||
try { | ||
await query(client, "pg_advisory_unlock", key); | ||
} finally { | ||
client.end(); | ||
} | ||
}; | ||
cachedUnlock = unlockFn; | ||
return unlockFn; | ||
} catch (err) { | ||
client.end(); | ||
throw err; | ||
} | ||
} | ||
async function tryLock() { | ||
const client = await newClient(); | ||
try { | ||
const obtained = await query(client, "pg_try_advisory_lock", key); | ||
if (obtained) { | ||
// For backwards compatibility we assign it to unlock | ||
const unlockFn = async function unlock() { | ||
try { | ||
await query(client, "pg_advisory_unlock", key); | ||
} finally { | ||
client.end(); | ||
} | ||
}; | ||
cachedUnlock = unlockFn; | ||
return unlockFn; | ||
} else { | ||
client.end(); | ||
} | ||
} catch (err) { | ||
client.end(); | ||
throw err; | ||
} | ||
} | ||
// TODO: catch db disconnection errors? | ||
const withLock = (fn: () => void) => | ||
lock().then(() => | ||
Promise.resolve() | ||
.then(fn) | ||
.then( | ||
(res) => unlock().then(() => res), | ||
(err) => | ||
unlock().then(() => { | ||
throw err; | ||
}) | ||
) | ||
); | ||
const withLock: WithLockFunction = async function (fn = async () => {}) { | ||
const unlock = await lock(); | ||
try { | ||
return await fn(); | ||
} finally { | ||
await unlock(); | ||
} | ||
}; | ||
const fns: FunctionObject = { lock, unlock, tryLock, withLock }; | ||
// "Block" function calls until client is connected | ||
const guardedFns: FunctionObject = {}; | ||
Object.keys(fns).forEach((fnName) => { | ||
guardedFns[fnName] = (...args) => | ||
waitForConnection().then(() => fns[fnName](...args)); | ||
}); | ||
return guardedFns; | ||
return { lock, unlock, tryLock, withLock }; | ||
}; | ||
createMutex.client = client; | ||
return createMutex; | ||
}; |
@@ -16,3 +16,3 @@ import test from "tape"; | ||
test("lock/unlock on same connection", (t) => { | ||
test("lock/unlock", (t) => { | ||
t.plan(3); | ||
@@ -26,17 +26,17 @@ | ||
const testLockUnlock = (iVal) => | ||
lock() | ||
.then(() => { | ||
t.equal(i, iVal, "i is equal to 0"); | ||
i++; | ||
// wait 300ms before decrementing i | ||
return timeout(300); | ||
}) | ||
.then(() => i--) | ||
.then(unlock) | ||
.catch(t.fail); | ||
testLockUnlock(0); | ||
testLockUnlock(1); | ||
testLockUnlock(2); | ||
const testLockUnlock = async () => { | ||
try { | ||
await lock(); | ||
t.equal(i, 0, `${i} is equal to 0`); | ||
i++; | ||
await timeout(300); | ||
i--; | ||
await unlock(); | ||
} catch (err) { | ||
t.fail(err); | ||
} | ||
}; | ||
testLockUnlock(); | ||
testLockUnlock(); | ||
testLockUnlock(); | ||
// we can acquire lock both times because we're using the same connection | ||
@@ -71,44 +71,31 @@ }); | ||
test("tryLock", (t) => { | ||
test("tryLock", async (t) => { | ||
function assertAcquired(val) { | ||
t.equal(typeof val, "function", "acquired"); | ||
} | ||
function assertNotAcquired(val) { | ||
t.equal(typeof val, "undefined", "not acquired"); | ||
} | ||
const mutex1 = advisoryLock(conString)("test-try-lock"); | ||
const mutex2 = advisoryLock(conString)("test-try-lock"); | ||
mutex1 | ||
.tryLock() | ||
.then((obtained) => { | ||
t.equal(obtained, true); | ||
}) | ||
.then(() => mutex2.tryLock()) | ||
.then((obtained) => { | ||
t.equal(obtained, false); | ||
}) | ||
.then(() => mutex1.unlock()) | ||
.then(() => mutex2.tryLock()) | ||
.then((obtained) => { | ||
t.equal(obtained, true); | ||
}) | ||
.then(() => mutex1.tryLock()) | ||
.then((obtained) => { | ||
t.equal(obtained, false); | ||
}) | ||
.then(() => mutex2.unlock()) | ||
.then(() => t.end()) | ||
.catch(t.fail); | ||
assertAcquired(await mutex1.tryLock()); | ||
assertNotAcquired(await mutex2.tryLock()); | ||
await mutex1.unlock(); | ||
assertAcquired(await mutex2.tryLock()); | ||
assertNotAcquired(await mutex1.tryLock()); | ||
await mutex2.unlock(); | ||
}); | ||
test("withLock followed by tryLock", (t) => { | ||
test("withLock followed by tryLock", async (t) => { | ||
const mutex1 = advisoryLock(conString)("test-withlock-lock"); | ||
const mutex2 = advisoryLock(conString)("test-withlock-lock"); | ||
mutex1 | ||
.withLock(() => | ||
mutex2 | ||
.tryLock() | ||
.then((obtained) => t.equal(obtained, false)) | ||
.then(() => "someval") | ||
) | ||
.then((res) => t.equal(res, "someval")) | ||
.then(() => mutex2.tryLock()) | ||
.then((obtained) => t.equal(obtained, true)) | ||
.then(() => mutex2.unlock()) | ||
.then(() => t.end()) | ||
.catch(t.fail); | ||
const val = await mutex1.withLock(async () => { | ||
const unlock = await mutex2.tryLock(); | ||
t.equal(typeof unlock, "undefined"); | ||
return "someval"; | ||
}); | ||
t.equal(val, "someval"); | ||
const unlock = await mutex2.tryLock(); | ||
t.equal(typeof unlock, "function"); | ||
await unlock(); | ||
}); | ||
@@ -115,0 +102,0 @@ |
@@ -7,34 +7,25 @@ import test from "tape"; | ||
// Returns the number of active connections to the database | ||
const getActiveConnections = () => | ||
new Promise((resolve, reject) => { | ||
async function getActiveConnections() { | ||
const client = new pg.Client(conString); | ||
try { | ||
await client.connect(); | ||
const sql = "SELECT count(*) FROM pg_stat_activity"; | ||
const client = new pg.Client(conString); | ||
client.connect((connectError) => { | ||
if (connectError) return reject(connectError); | ||
client.query(sql, (queryError, result) => { | ||
if (queryError) return reject(queryError); | ||
resolve(Number(result.rows[0].count)); | ||
client.end(); | ||
}); | ||
}); | ||
}); | ||
const result = await client.query(sql); | ||
return Number(result.rows[0].count); | ||
} finally { | ||
client.end(); | ||
} | ||
} | ||
test("withLock releases connection after unlocking", (t) => { | ||
getActiveConnections().then((startConnectionCount) => { | ||
for (let i = 0; i < 25; i++) { | ||
const createMutex = advisoryLock(conString); | ||
createMutex("test-withlock-release") | ||
.withLock() | ||
.catch(t.fail) | ||
.then(() => { | ||
createMutex.client.end(); | ||
}); | ||
} | ||
timeout(500).then(() => { | ||
getActiveConnections().then((connectionCount) => { | ||
t.equal(connectionCount, startConnectionCount); | ||
t.end(); | ||
}); | ||
test("withLock releases connection after unlocking", async (t) => { | ||
const startConnectionCount = await getActiveConnections(); | ||
for (let i = 0; i < 25; i++) { | ||
const createMutex = advisoryLock(conString); | ||
await createMutex("test-withlock-release").withLock(() => { | ||
// do something | ||
}); | ||
}); | ||
} | ||
await timeout(1000); | ||
const connectionCount = await getActiveConnections(); | ||
t.equal(connectionCount, startConnectionCount); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
188573
1024
127