New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

bottleneck

Package Overview
Dependencies
Maintainers
1
Versions
79
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bottleneck - npm Package Compare versions

Comparing version 2.15.1 to 2.15.2

src/redis/blacklist_client.lua

2

bower.json
{
"name": "bottleneck",
"main": "bottleneck.js",
"version": "2.15.1",
"version": "2.15.2",
"homepage": "https://github.com/SGrondin/bottleneck",

@@ -6,0 +6,0 @@ "authors": [

@@ -273,3 +273,3 @@ "use strict";

if (this.queued() === 0) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}

@@ -285,3 +285,3 @@

if (capacity != null && options.weight > capacity) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}

@@ -319,5 +319,7 @@

this._run(next, wait, index, 0);
return this.Promise.resolve(options.weight);
} else {
return this.Promise.resolve(null);
}
return this.Promise.resolve(success);
});

@@ -327,8 +329,11 @@ });

_drainAll(capacity) {
return this._drainOne(capacity).then(success => {
if (success) {
return this._drainAll();
_drainAll(capacity, total = 0) {
return this._drainOne(capacity).then(drained => {
var newCapacity;
if (drained != null) {
newCapacity = capacity != null ? capacity - drained : capacity;
return this._drainAll(newCapacity, total + drained);
} else {
return this.Promise.resolve(success);
return this.Promise.resolve(total);
}

@@ -388,3 +393,3 @@ }).catch(e => {

}, this._drainOne = () => {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}, this._registerLock.schedule(() => {

@@ -391,0 +396,0 @@ return this._submitLock.schedule(() => {

{
"blacklist_client.lua": "local blacklist = ARGV[3]\n\nredis.call('zadd', client_last_seen_key, 0, blacklist)\n\nreturn {}\n",
"check.lua": "local weight = tonumber(ARGV[3])\n\nlocal capacity = process_tick(now, false)['capacity']\nlocal nextRequest = tonumber(redis.call('hget', settings_key, 'nextRequest'))\n\nreturn conditions_check(capacity, weight) and nextRequest - now <= 0\n",

@@ -11,6 +12,6 @@ "conditions_check.lua": "local conditions_check = function (capacity, weight)\n return capacity == nil or weight <= capacity\nend\n",

"increment_reservoir.lua": "local incr = tonumber(ARGV[3])\n\nredis.call('hincrby', settings_key, 'reservoir', incr)\n\nlocal reservoir = process_tick(now, true)['reservoir']\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(0, 0, groupTimeout)\n\nreturn reservoir\n",
"init.lua": "local clear = tonumber(ARGV[3])\nlocal limiter_version = ARGV[4]\nlocal num_static_argv = 4\n\nif clear == 1 then\n redis.call('del', unpack(KEYS))\nend\n\nif redis.call('exists', settings_key) == 0 then\n -- Create\n local args = {'hmset', settings_key}\n\n for i = num_static_argv + 1, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\n redis.call('hmset', settings_key,\n 'nextRequest', now,\n 'lastReservoirRefresh', now,\n 'running', 0,\n 'done', 0,\n 'unblockTime', 0\n )\n\nelse\n -- Apply migrations\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'version'\n )\n local id = settings[1]\n local current_version = settings[2]\n\n if current_version ~= limiter_version then\n local version_digits = {}\n for k, v in string.gmatch(current_version, \"([^.]+)\") do\n table.insert(version_digits, tonumber(k))\n end\n\n -- 2.10.0\n if version_digits[2] < 10 then\n redis.call('hsetnx', settings_key, 'reservoirRefreshInterval', '')\n redis.call('hsetnx', settings_key, 'reservoirRefreshAmount', '')\n redis.call('hsetnx', settings_key, 'lastReservoirRefresh', '')\n redis.call('hsetnx', settings_key, 'done', 0)\n redis.call('hset', settings_key, 'version', '2.10.0')\n end\n\n -- 2.11.1\n if version_digits[2] < 11 and version_digits[3] < 1 then\n if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then\n redis.call('hmset', settings_key,\n 'lastReservoirRefresh', now,\n 'version', '2.11.1'\n )\n end\n end\n\n -- 2.14.0\n if version_digits[2] < 14 then\n local old_running_key = 'b_'..id..'_running'\n local old_executing_key = 'b_'..id..'_executing'\n\n if redis.call('exists', old_running_key) == 1 then\n redis.call('rename', old_running_key, job_weights_key)\n end\n if redis.call('exists', old_executing_key) == 1 then\n redis.call('rename', old_executing_key, job_expirations_key)\n end\n redis.call('hset', settings_key, 'version', '2.14.0')\n end\n\n end\n\n process_tick(now, false)\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(0, 0, groupTimeout)\n\nreturn {}\n",
"process_tick.lua": "local process_tick = function (now, always_publish)\n\n local compute_capacity = function (maxConcurrent, running, reservoir)\n if maxConcurrent ~= nil and reservoir ~= nil then\n return math.min((maxConcurrent - running), reservoir)\n elseif maxConcurrent ~= nil then\n return maxConcurrent - running\n elseif reservoir ~= nil then\n return reservoir\n else\n return nil\n end\n end\n\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'maxConcurrent',\n 'running',\n 'reservoir',\n 'reservoirRefreshInterval',\n 'reservoirRefreshAmount',\n 'lastReservoirRefresh'\n )\n local id = settings[1]\n local maxConcurrent = tonumber(settings[2])\n local running = tonumber(settings[3])\n local reservoir = tonumber(settings[4])\n local reservoirRefreshInterval = tonumber(settings[5])\n local reservoirRefreshAmount = tonumber(settings[6])\n local lastReservoirRefresh = tonumber(settings[7])\n\n local initial_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n --\n -- Process 'running' changes\n --\n local expired = redis.call('zrangebyscore', job_expirations_key, '-inf', '('..now)\n\n if #expired > 0 then\n redis.call('zremrangebyscore', job_expirations_key, '-inf', '('..now)\n\n local flush_batch = function (batch, acc)\n local weights = redis.call('hmget', job_weights_key, unpack(batch))\n redis.call('hdel', job_weights_key, unpack(batch))\n local clients = redis.call('hmget', job_clients_key, unpack(batch))\n redis.call('hdel', job_clients_key, unpack(batch))\n\n -- Calculate sum of removed weights\n for i = 1, #weights do\n acc['total'] = acc['total'] + (tonumber(weights[i]) or 0)\n end\n\n -- Calculate sum of removed weights by client\n local client_weights = {}\n for i = 1, #clients do\n if weights[i] ~= nil then\n acc['client_weights'][clients[i]] = (acc['client_weights'][clients[i]] or 0) + tonumber(weights[i])\n end\n end\n end\n\n local acc = {\n ['total'] = 0,\n ['client_weights'] = {}\n }\n local batch_size = 1000\n\n -- Compute changes to Zsets and apply changes to Hashes\n for i = 1, #expired, batch_size do\n local batch = {}\n for j = i, math.min(i + batch_size - 1, #expired) do\n table.insert(batch, expired[j])\n end\n\n flush_batch(batch, acc)\n end\n\n -- Apply changes to Zsets\n if acc['total'] > 0 then\n redis.call('hincrby', settings_key, 'done', acc['total'])\n running = tonumber(redis.call('hincrby', settings_key, 'running', -acc['total']))\n end\n\n for client, weight in pairs(acc['client_weights']) do\n redis.call('zincrby', client_running_key, -weight, client)\n end\n end\n\n --\n -- Process 'reservoir' changes\n --\n local reservoirRefreshActive = reservoirRefreshInterval ~= nil and reservoirRefreshAmount ~= nil\n if reservoirRefreshActive and now >= lastReservoirRefresh + reservoirRefreshInterval then\n reservoir = reservoirRefreshAmount\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'lastReservoirRefresh', now\n )\n end\n\n --\n -- Broadcast capacity changes\n --\n local final_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n if always_publish or (initial_capacity ~= nil and final_capacity == nil) then\n -- always_publish or was not unlimited, now unlimited\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n\n elseif initial_capacity ~= nil and final_capacity ~= nil and final_capacity > initial_capacity then\n -- capacity was increased\n -- send the capacity message to the limiter having the lowest number of running jobs\n -- the tiebreaker is the limiter having not registered a job in the longest time\n\n local lowest_concurrency_value = nil\n local lowest_concurrency_clients = {}\n local lowest_concurrency_last_registered = {}\n local client_concurrencies = redis.call('zrange', client_running_key, 0, -1, 'withscores')\n\n for i = 1, #client_concurrencies, 2 do\n local client = client_concurrencies[i]\n local concurrency = tonumber(client_concurrencies[i+1])\n\n if (\n lowest_concurrency_value == nil or lowest_concurrency_value == concurrency\n ) and (\n tonumber(redis.call('hget', client_num_queued_key, client)) > 0\n ) then\n lowest_concurrency_value = concurrency\n table.insert(lowest_concurrency_clients, client)\n local last_registered = tonumber(redis.call('zscore', client_last_registered_key, client))\n table.insert(lowest_concurrency_last_registered, last_registered)\n end\n end\n\n if #lowest_concurrency_clients > 0 then\n local position = 1\n local earliest = lowest_concurrency_last_registered[1]\n\n for i,v in ipairs(lowest_concurrency_last_registered) do\n if v < earliest then\n position = i\n earliest = v\n end\n end\n\n local next_client = lowest_concurrency_clients[position]\n redis.call('publish', 'b_'..id, 'capacity-priority:'..(final_capacity or '')..':'..next_client)\n else\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n end\n end\n\n return {\n ['capacity'] = final_capacity,\n ['running'] = running,\n ['reservoir'] = reservoir\n }\nend\n",
"init.lua": "local clear = tonumber(ARGV[3])\nlocal limiter_version = ARGV[4]\nlocal num_static_argv = 4\n\nif clear == 1 then\n redis.call('del', unpack(KEYS))\nend\n\nif redis.call('exists', settings_key) == 0 then\n -- Create\n local args = {'hmset', settings_key}\n\n for i = num_static_argv + 1, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\n redis.call('hmset', settings_key,\n 'nextRequest', now,\n 'lastReservoirRefresh', now,\n 'running', 0,\n 'done', 0,\n 'unblockTime', 0,\n 'capacityPriorityCounter', 0\n )\n\nelse\n -- Apply migrations\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'version'\n )\n local id = settings[1]\n local current_version = settings[2]\n\n if current_version ~= limiter_version then\n local version_digits = {}\n for k, v in string.gmatch(current_version, \"([^.]+)\") do\n table.insert(version_digits, tonumber(k))\n end\n\n -- 2.10.0\n if version_digits[2] < 10 then\n redis.call('hsetnx', settings_key, 'reservoirRefreshInterval', '')\n redis.call('hsetnx', settings_key, 'reservoirRefreshAmount', '')\n redis.call('hsetnx', settings_key, 'lastReservoirRefresh', '')\n redis.call('hsetnx', settings_key, 'done', 0)\n redis.call('hset', settings_key, 'version', '2.10.0')\n end\n\n -- 2.11.1\n if version_digits[2] < 11 and version_digits[3] < 1 then\n if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then\n redis.call('hmset', settings_key,\n 'lastReservoirRefresh', now,\n 'version', '2.11.1'\n )\n end\n end\n\n -- 2.14.0\n if version_digits[2] < 14 then\n local old_running_key = 'b_'..id..'_running'\n local old_executing_key = 'b_'..id..'_executing'\n\n if redis.call('exists', old_running_key) == 1 then\n redis.call('rename', old_running_key, job_weights_key)\n end\n if redis.call('exists', old_executing_key) == 1 then\n redis.call('rename', old_executing_key, job_expirations_key)\n end\n redis.call('hset', settings_key, 'version', '2.14.0')\n end\n\n -- 2.15.2\n if version_digits[2] < 15 and version_digits[3] < 2 then\n redis.call('hsetnx', settings_key, 'capacityPriorityCounter', 0)\n redis.call('hset', settings_key, 'version', '2.15.2')\n end\n\n end\n\n process_tick(now, false)\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(0, 0, groupTimeout)\n\nreturn {}\n",
"process_tick.lua": "local process_tick = function (now, always_publish)\n\n local compute_capacity = function (maxConcurrent, running, reservoir)\n if maxConcurrent ~= nil and reservoir ~= nil then\n return math.min((maxConcurrent - running), reservoir)\n elseif maxConcurrent ~= nil then\n return maxConcurrent - running\n elseif reservoir ~= nil then\n return reservoir\n else\n return nil\n end\n end\n\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'maxConcurrent',\n 'running',\n 'reservoir',\n 'reservoirRefreshInterval',\n 'reservoirRefreshAmount',\n 'lastReservoirRefresh',\n 'capacityPriorityCounter'\n )\n local id = settings[1]\n local maxConcurrent = tonumber(settings[2])\n local running = tonumber(settings[3])\n local reservoir = tonumber(settings[4])\n local reservoirRefreshInterval = tonumber(settings[5])\n local reservoirRefreshAmount = tonumber(settings[6])\n local lastReservoirRefresh = tonumber(settings[7])\n local capacityPriorityCounter = tonumber(settings[8])\n\n local initial_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n --\n -- Process 'running' changes\n --\n local expired = redis.call('zrangebyscore', job_expirations_key, '-inf', '('..now)\n\n if #expired > 0 then\n redis.call('zremrangebyscore', job_expirations_key, '-inf', '('..now)\n\n local flush_batch = function (batch, acc)\n local weights = redis.call('hmget', job_weights_key, unpack(batch))\n redis.call('hdel', job_weights_key, unpack(batch))\n local clients = redis.call('hmget', job_clients_key, unpack(batch))\n redis.call('hdel', job_clients_key, unpack(batch))\n\n -- Calculate sum of removed weights\n for i = 1, #weights do\n acc['total'] = acc['total'] + (tonumber(weights[i]) or 0)\n end\n\n -- Calculate sum of removed weights by client\n local client_weights = {}\n for i = 1, #clients do\n if weights[i] ~= nil then\n acc['client_weights'][clients[i]] = (acc['client_weights'][clients[i]] or 0) + tonumber(weights[i])\n end\n end\n end\n\n local acc = {\n ['total'] = 0,\n ['client_weights'] = {}\n }\n local batch_size = 1000\n\n -- Compute changes to Zsets and apply changes to Hashes\n for i = 1, #expired, batch_size do\n local batch = {}\n for j = i, math.min(i + batch_size - 1, #expired) do\n table.insert(batch, expired[j])\n end\n\n flush_batch(batch, acc)\n end\n\n -- Apply changes to Zsets\n if acc['total'] > 0 then\n redis.call('hincrby', settings_key, 'done', acc['total'])\n running = tonumber(redis.call('hincrby', settings_key, 'running', -acc['total']))\n end\n\n for client, weight in pairs(acc['client_weights']) do\n redis.call('zincrby', client_running_key, -weight, client)\n end\n end\n\n --\n -- Process 'reservoir' changes\n --\n local reservoirRefreshActive = reservoirRefreshInterval ~= nil and reservoirRefreshAmount ~= nil\n if reservoirRefreshActive and now >= lastReservoirRefresh + reservoirRefreshInterval then\n reservoir = reservoirRefreshAmount\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'lastReservoirRefresh', now\n )\n end\n\n --\n -- Broadcast capacity changes\n --\n local final_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n if always_publish or (initial_capacity ~= nil and final_capacity == nil) then\n -- always_publish or was not unlimited, now unlimited\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n\n elseif initial_capacity ~= nil and final_capacity ~= nil and final_capacity > initial_capacity then\n -- capacity was increased\n -- send the capacity message to the limiter having the lowest number of running jobs\n -- the tiebreaker is the limiter having not registered a job in the longest time\n\n local lowest_concurrency_value = nil\n local lowest_concurrency_clients = {}\n local lowest_concurrency_last_registered = {}\n local client_concurrencies = redis.call('zrange', client_running_key, 0, -1, 'withscores')\n local valid_clients = redis.call('zrangebyscore', client_last_seen_key, (now - 10000), 'inf')\n local valid_clients_lookup = {}\n for i = 1, #valid_clients do\n valid_clients_lookup[valid_clients[i]] = true\n end\n\n for i = 1, #client_concurrencies, 2 do\n local client = client_concurrencies[i]\n local concurrency = tonumber(client_concurrencies[i+1])\n\n if (\n lowest_concurrency_value == nil or lowest_concurrency_value == concurrency\n ) and (\n valid_clients_lookup[client]\n ) and (\n tonumber(redis.call('hget', client_num_queued_key, client)) > 0\n ) then\n lowest_concurrency_value = concurrency\n table.insert(lowest_concurrency_clients, client)\n local last_registered = tonumber(redis.call('zscore', client_last_registered_key, client))\n table.insert(lowest_concurrency_last_registered, last_registered)\n end\n end\n\n if #lowest_concurrency_clients > 0 then\n local position = 1\n local earliest = lowest_concurrency_last_registered[1]\n\n for i,v in ipairs(lowest_concurrency_last_registered) do\n if v < earliest then\n position = i\n earliest = v\n end\n end\n\n local next_client = lowest_concurrency_clients[position]\n redis.call('publish', 'b_'..id,\n 'capacity-priority:'..(final_capacity or '')..\n ':'..next_client..\n ':'..capacityPriorityCounter\n )\n redis.call('hincrby', settings_key, 'capacityPriorityCounter', '1')\n else\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n end\n end\n\n return {\n ['capacity'] = final_capacity,\n ['running'] = running,\n ['reservoir'] = reservoir\n }\nend\n",
"refresh_expiration.lua": "local refresh_expiration = function (now, nextRequest, groupTimeout)\n\n if groupTimeout ~= nil then\n local ttl = (nextRequest + groupTimeout) - now\n\n for i = 1, #KEYS do\n redis.call('pexpire', KEYS[i], ttl)\n end\n end\n\nend\n",
"refs.lua": "local settings_key = KEYS[1]\nlocal job_weights_key = KEYS[2]\nlocal job_expirations_key = KEYS[3]\nlocal job_clients_key = KEYS[4]\nlocal client_running_key = KEYS[5]\nlocal client_num_queued_key = KEYS[6]\nlocal client_last_registered_key = KEYS[7]\n\nlocal now = tonumber(ARGV[1])\nlocal client = ARGV[2]\n",
"refs.lua": "local settings_key = KEYS[1]\nlocal job_weights_key = KEYS[2]\nlocal job_expirations_key = KEYS[3]\nlocal job_clients_key = KEYS[4]\nlocal client_running_key = KEYS[5]\nlocal client_num_queued_key = KEYS[6]\nlocal client_last_registered_key = KEYS[7]\nlocal client_last_seen_key = KEYS[8]\n\nlocal now = tonumber(ARGV[1])\nlocal client = ARGV[2]\n\nredis.call('zadd', client_last_seen_key, now, client)\n",
"register.lua": "local index = ARGV[3]\nlocal weight = tonumber(ARGV[4])\nlocal expiration = tonumber(ARGV[5])\n\nlocal state = process_tick(now, false)\nlocal capacity = state['capacity']\nlocal reservoir = state['reservoir']\n\nlocal settings = redis.call('hmget', settings_key,\n 'nextRequest',\n 'minTime',\n 'groupTimeout'\n)\nlocal nextRequest = tonumber(settings[1])\nlocal minTime = tonumber(settings[2])\nlocal groupTimeout = tonumber(settings[3])\n\nif conditions_check(capacity, weight) then\n\n redis.call('hincrby', settings_key, 'running', weight)\n redis.call('hset', job_weights_key, index, weight)\n if expiration ~= nil then\n redis.call('zadd', job_expirations_key, now + expiration, index)\n end\n redis.call('hset', job_clients_key, index, client)\n redis.call('zincrby', client_running_key, weight, client)\n redis.call('hincrby', client_num_queued_key, client, -1)\n redis.call('zadd', client_last_registered_key, now, client)\n\n local wait = math.max(nextRequest - now, 0)\n local newNextRequest = now + wait + minTime\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', newNextRequest\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', newNextRequest\n )\n end\n\n refresh_expiration(now, newNextRequest, groupTimeout)\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n",

@@ -17,0 +18,0 @@ "register_client.lua": "local queued = tonumber(ARGV[3])\n\nredis.call('zadd', client_running_key, 0, client)\nredis.call('hset', client_num_queued_key, client, queued)\nredis.call('zadd', client_last_registered_key, 0, client)\n\nreturn {}\n",

@@ -28,2 +28,3 @@ "use strict";

this.clients = {};
this.capacityPriorityCounters = {};
this.sharedConnection = this.connection != null;

@@ -85,31 +86,54 @@

return _asyncToGenerator(function* () {
var capacity, data, pos, priorityClient, type;
pos = message.indexOf(":");
var _ref2 = [message.slice(0, pos), message.slice(pos + 1)];
type = _ref2[0];
data = _ref2[1];
var capacity, counter, data, drained, e, newCapacity, pos, priorityClient, rawCapacity, type;
if (type === "capacity") {
return yield _this2.instance._drainAll(data.length > 0 ? ~~data : void 0);
} else if (type === "capacity-priority") {
var _data$split = data.split(":");
try {
pos = message.indexOf(":");
var _ref2 = [message.slice(0, pos), message.slice(pos + 1)];
type = _ref2[0];
data = _ref2[1];
var _data$split2 = _slicedToArray(_data$split, 2);
if (type === "capacity") {
return yield _this2.instance._drainAll(data.length > 0 ? ~~data : void 0);
} else if (type === "capacity-priority") {
var _data$split = data.split(":");
capacity = _data$split2[0];
priorityClient = _data$split2[1];
var _data$split2 = _slicedToArray(_data$split, 3);
if (priorityClient === _this2.clientId) {
yield _this2.instance._drainAll(capacity.length > 0 ? ~~capacity : void 0);
return yield _this2.clients.client.publish(_this2.instance.channel(), "capacity:");
} else {
yield new _this2.Promise(function (resolve, reject) {
return setTimeout(resolve, 500);
});
return yield _this2.instance._drainAll(capacity.length > 0 ? ~~capacity : void 0);
rawCapacity = _data$split2[0];
priorityClient = _data$split2[1];
counter = _data$split2[2];
capacity = rawCapacity.length > 0 ? ~~rawCapacity : void 0;
if (priorityClient === _this2.clientId) {
drained = yield _this2.instance._drainAll(capacity);
newCapacity = capacity != null ? capacity - (drained || 0) : "";
return yield _this2.clients.client.publish(_this2.instance.channel(), `capacity-priority:${newCapacity}::${counter}`);
} else if (priorityClient === "") {
clearTimeout(_this2.capacityPriorityCounters[counter]);
delete _this2.capacityPriorityCounters[counter];
return _this2.instance._drainAll(capacity);
} else {
return _this2.capacityPriorityCounters[counter] = setTimeout(
/*#__PURE__*/
_asyncToGenerator(function* () {
var e;
try {
delete _this2.capacityPriorityCounters[counter];
yield _this2.runScript("blacklist_client", [priorityClient]);
return yield _this2.instance._drainAll(capacity);
} catch (error) {
e = error;
return _this2.instance.Events.trigger("error", e);
}
}), 1000);
}
} else if (type === "message") {
return _this2.instance.Events.trigger("message", data);
} else if (type === "blocked") {
return yield _this2.instance._dropAllQueued();
}
} else if (type === "message") {
return _this2.instance.Events.trigger("message", data);
} else if (type === "blocked") {
return _this2.instance._dropAllQueued();
} catch (error) {
e = error;
return _this2.instance.Events.trigger("error", e);
}

@@ -251,9 +275,9 @@ })();

var _ref3 = yield _this7.runScript("register", _this7.prepareArray([index, weight, expiration]));
var _ref4 = yield _this7.runScript("register", _this7.prepareArray([index, weight, expiration]));
var _ref4 = _slicedToArray(_ref3, 3);
var _ref5 = _slicedToArray(_ref4, 3);
success = _ref4[0];
wait = _ref4[1];
reservoir = _ref4[2];
success = _ref5[0];
wait = _ref5[1];
reservoir = _ref5[2];
return {

@@ -274,9 +298,9 @@ success: _this7.convertBool(success),

try {
var _ref5 = yield _this8.runScript("submit", _this8.prepareArray([queueLength, weight]));
var _ref6 = yield _this8.runScript("submit", _this8.prepareArray([queueLength, weight]));
var _ref6 = _slicedToArray(_ref5, 3);
var _ref7 = _slicedToArray(_ref6, 3);
reachedHWM = _ref6[0];
blocked = _ref6[1];
strategy = _ref6[2];
reachedHWM = _ref7[0];
blocked = _ref7[1];
strategy = _ref7[2];
return {

@@ -283,0 +307,0 @@ reachedHWM: _this8.convertBool(reachedHWM),

@@ -49,3 +49,8 @@ "use strict";

*/
`b_${id}_client_last_registered`];
`b_${id}_client_last_registered`,
/*
ZSET
client -> last seen
*/
`b_${id}_client_last_seen`];
};

@@ -61,5 +66,3 @@

group_check: {
keys: function keys(id) {
return [`b_${id}_settings`];
},
keys: exports.allKeys,
headers: [],

@@ -75,2 +78,8 @@ refresh_expiration: false,

},
blacklist_client: {
keys: exports.allKeys,
headers: ["validate_keys"],
refresh_expiration: false,
code: lua["blacklist_client.lua"]
},
heartbeat: {

@@ -77,0 +86,0 @@ keys: exports.allKeys,

@@ -1,1 +0,1 @@

{"version":"2.15.1"}
{"version":"2.15.2"}

@@ -611,3 +611,3 @@ /**

var version = "2.15.1";
var version = "2.15.2";
var version$1 = {

@@ -1058,3 +1058,3 @@ version: version

if (this.queued() === 0) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}

@@ -1064,3 +1064,3 @@ queue = this._queues.getFirst();

if ((capacity != null) && options.weight > capacity) {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}

@@ -1082,4 +1082,6 @@ this.Events.trigger("debug", `Draining ${options.id}`, {args, options});

this._run(next, wait, index, 0);
return this.Promise.resolve(options.weight);
} else {
return this.Promise.resolve(null);
}
return this.Promise.resolve(success);
});

@@ -1089,8 +1091,10 @@ });

_drainAll(capacity) {
return this._drainOne(capacity).then((success) => {
if (success) {
return this._drainAll();
_drainAll(capacity, total = 0) {
return this._drainOne(capacity).then((drained) => {
var newCapacity;
if (drained != null) {
newCapacity = capacity != null ? capacity - drained : capacity;
return this._drainAll(newCapacity, total + drained);
} else {
return this.Promise.resolve(success);
return this.Promise.resolve(total);
}

@@ -1145,3 +1149,3 @@ }).catch((e) => {

}, this._drainOne = () => {
return this.Promise.resolve(false);
return this.Promise.resolve(null);
}, this._registerLock.schedule(() => {

@@ -1148,0 +1152,0 @@ return this._submitLock.schedule(() => {

{
"name": "bottleneck",
"version": "2.15.1",
"version": "2.15.2",
"description": "Distributed task scheduler and rate limiter",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -238,3 +238,3 @@ var makeTest = require('./context')

runCommand(c.limiter, 'hset', [settings_key, 'version', '2.8.0']),
runCommand(c.limiter, 'hdel', [settings_key, 'done']),
runCommand(c.limiter, 'hdel', [settings_key, 'done', 'capacityPriorityCounter']),
runCommand(c.limiter, 'hset', [settings_key, 'lastReservoirRefresh', ''])

@@ -257,2 +257,3 @@ ])

'reservoirRefreshAmount',
'capacityPriorityCounter',
'lastReservoirRefresh'

@@ -264,3 +265,3 @@ ])

assert(parseInt(lastReservoirRefresh) > Date.now() - 500)
c.mustEqual(values.slice(0, values.length - 1), ['2.14.0', '0', '', ''])
c.mustEqual(values.slice(0, values.length - 1), ['2.15.2', '0', '', '', '0'])
})

@@ -426,3 +427,3 @@ .then(function () {

var getData = function (limiter) {
c.mustEqual(limiterKeys(limiter).length, 7) // Asserting, to remember to edit this test when keys change
c.mustEqual(limiterKeys(limiter).length, 8) // Asserting, to remember to edit this test when keys change
var [

@@ -435,3 +436,4 @@ settings_key,

client_num_queued_key,
client_last_registered_key
client_last_registered_key,
client_last_seen_key
] = limiterKeys(limiter)

@@ -446,3 +448,4 @@

runCommand(limiter1, 'hvals', [client_num_queued_key]),
runCommand(limiter1, 'zrange', [client_last_registered_key, '0', '-1', 'withscores'])
runCommand(limiter1, 'zrange', [client_last_registered_key, '0', '-1', 'withscores']),
runCommand(limiter1, 'zrange', [client_last_seen_key, '0', '-1', 'withscores'])
])

@@ -487,3 +490,4 @@ }

client_num_queued,
client_last_registered
client_last_registered,
client_last_seen
]) {

@@ -498,2 +502,3 @@ c.mustEqual(settings, ['15', '0'])

c.mustEqual(client_last_registered[1], '0')
assert(client_last_seen[1] > Date.now() - 1000)
var passed = Date.now() - parseFloat(client_last_registered[3])

@@ -514,3 +519,4 @@ assert(passed > 0 && passed < 20)

client_num_queued,
client_last_registered
client_last_registered,
client_last_seen
]) {

@@ -525,2 +531,3 @@ c.mustEqual(settings, ['1', '14'])

c.mustEqual(client_last_registered[1], '0')
assert(client_last_seen[1] > Date.now() - 1000)
var passed = Date.now() - parseFloat(client_last_registered[3])

@@ -682,3 +689,3 @@ assert(passed > 170 && passed < 200)

.then(function (deleted) {
c.mustEqual(deleted, 4)
c.mustEqual(deleted, 5)
return limiter.disconnect(false)

@@ -698,3 +705,3 @@ })

.then(function (deleted) {
c.mustEqual(deleted, 4)
c.mustEqual(deleted, 5)
return countKeys(limiter)

@@ -711,3 +718,3 @@ })

.then(function (count) {
c.mustEqual(count, 1)
c.mustEqual(count, 2) // init and client_last_seen
return limiter.disconnect(false)

@@ -934,3 +941,3 @@ })

.then(function (counts) {
c.mustEqual(counts, [4, 4, 4])
c.mustEqual(counts, [5, 5, 5])
return Promise.all([

@@ -992,3 +999,3 @@ limiter1.schedule(job, 'a'),

c.mustEqual(done, 1)
return c.wait(500)
return c.wait(400)
})

@@ -999,3 +1006,3 @@ .then(function () {

.then(function (count) {
c.mustEqual(count, 0)
c.mustEqual(count, 1)
return group.disconnect(false)

@@ -1316,3 +1323,59 @@ })

it('Should take the capacity and blacklist if the priority limiter is not responding', async function () {
c = makeTest()
var limiter1 = new Bottleneck({
datastore: process.env.DATASTORE,
clearDatastore: true,
id: 'crash',
timeout: 3000,
maxConcurrent: 1,
trackDoneStatus: true
})
var limiter2 = new Bottleneck({
datastore: process.env.DATASTORE,
clearDatastore: true,
id: 'crash',
timeout: 3000,
maxConcurrent: 1,
trackDoneStatus: true
})
var limiter3 = new Bottleneck({
datastore: process.env.DATASTORE,
clearDatastore: true,
id: 'crash',
timeout: 3000,
maxConcurrent: 1,
trackDoneStatus: true
})
await limiter1.schedule({id: '1'}, c.promise, null, 'A')
await limiter2.schedule({id: '2'}, c.promise, null, 'B')
await limiter3.schedule({id: '3'}, c.promise, null, 'C')
var resolve1, resolve2, resolve3
var p1 = new Promise(function (resolve, reject) {
resolve1 = function (err, n) { resolve(n) }
})
var p2 = new Promise(function (resolve, reject) {
resolve2 = function (err, n) { resolve(n) }
})
var p3 = new Promise(function (resolve, reject) {
resolve3 = function (err, n) { resolve(n) }
})
await limiter1.submit({id: '4'}, c.slowJob, 100, null, 4, resolve1)
await limiter2.submit({id: '5'}, c.slowJob, 100, null, 5, resolve2)
await limiter3.submit({id: '6'}, c.slowJob, 100, null, 6, resolve3)
await limiter2.disconnect(false)
await Promise.all([p1, p3])
c.checkResultsOrder([['A'], ['B'], ['C'], [4], [6]])
await limiter1.disconnect(false)
await limiter2.disconnect(false)
await limiter3.disconnect(false)
})
})
}

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc