bottleneck
Advanced tools
Comparing version 2.16.2 to 2.17.0
{ | ||
"name": "bottleneck", | ||
"main": "bottleneck.js", | ||
"version": "2.16.2", | ||
"version": "2.17.0", | ||
"homepage": "https://github.com/SGrondin/bottleneck", | ||
@@ -6,0 +6,0 @@ "authors": [ |
@@ -571,5 +571,14 @@ "use strict"; | ||
var cb, ref, returned; | ||
var cb, e, ref, returned; | ||
ref = args, (_ref11 = ref, _ref12 = _toArray(_ref11), args = _ref12.slice(0), _ref11), (_splice$call7 = splice.call(args, -1), _splice$call8 = _slicedToArray(_splice$call7, 1), cb = _splice$call8[0], _splice$call7); | ||
returned = task(...args); | ||
returned = function () { | ||
try { | ||
return task(...args); | ||
} catch (error1) { | ||
e = error1; | ||
return this.Promise.reject(e); | ||
} | ||
}.call(this); | ||
return (!((returned != null ? returned.then : void 0) != null && typeof returned.then === "function") ? this.Promise.resolve(returned) : returned).then(function (...args) { | ||
@@ -666,2 +675,3 @@ return cb(null, ...args); | ||
heartbeatInterval: 5000, | ||
clientTimeout: 10000, | ||
clientOptions: {}, | ||
@@ -668,0 +678,0 @@ clusterNodes: null, |
{ | ||
"blacklist_client.lua": "local blacklist = ARGV[num_static_argv + 1]\n\nredis.call('zadd', client_last_seen_key, 0, blacklist)\n\nreturn {}\n", | ||
"blacklist_client.lua": "local blacklist = ARGV[num_static_argv + 1]\n\nif redis.call('zscore', client_last_seen_key, blacklist) then\n redis.call('zadd', client_last_seen_key, 0, blacklist)\nend\n\n\nreturn {}\n", | ||
"check.lua": "local weight = tonumber(ARGV[num_static_argv + 1])\n\nlocal capacity = process_tick(now, false)['capacity']\nlocal nextRequest = tonumber(redis.call('hget', settings_key, 'nextRequest'))\n\nreturn conditions_check(capacity, weight) and nextRequest - now <= 0\n", | ||
@@ -12,12 +12,14 @@ "conditions_check.lua": "local conditions_check = function (capacity, weight)\n return capacity == nil or weight <= capacity\nend\n", | ||
"increment_reservoir.lua": "local incr = tonumber(ARGV[num_static_argv + 1])\n\nredis.call('hincrby', settings_key, 'reservoir', incr)\n\nlocal reservoir = process_tick(now, true)['reservoir']\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(0, 0, groupTimeout)\n\nreturn reservoir\n", | ||
"init.lua": "local clear = tonumber(ARGV[num_static_argv + 1])\nlocal limiter_version = ARGV[num_static_argv + 2]\nlocal num_local_argv = num_static_argv + 2\n\nif clear == 1 then\n redis.call('del', unpack(KEYS))\nend\n\nif redis.call('exists', settings_key) == 0 then\n -- Create\n local args = {'hmset', settings_key}\n\n for i = num_local_argv + 1, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\n redis.call('hmset', settings_key,\n 'nextRequest', now,\n 'lastReservoirRefresh', now,\n 'running', 0,\n 'done', 0,\n 'unblockTime', 0,\n 'capacityPriorityCounter', 0\n )\n\n if clear == 1 then\n -- Force all connected clients to re-register\n process_tick(now, true)\n end\n\nelse\n -- Apply migrations\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'version'\n )\n local id = settings[1]\n local current_version = settings[2]\n\n if current_version ~= limiter_version then\n local version_digits = {}\n for k, v in string.gmatch(current_version, \"([^.]+)\") do\n table.insert(version_digits, tonumber(k))\n end\n\n -- 2.10.0\n if version_digits[2] < 10 then\n redis.call('hsetnx', settings_key, 'reservoirRefreshInterval', '')\n redis.call('hsetnx', settings_key, 'reservoirRefreshAmount', '')\n redis.call('hsetnx', settings_key, 'lastReservoirRefresh', '')\n redis.call('hsetnx', settings_key, 'done', 0)\n redis.call('hset', settings_key, 'version', '2.10.0')\n end\n\n -- 2.11.1\n if version_digits[2] < 11 and version_digits[3] < 1 then\n if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then\n redis.call('hmset', settings_key,\n 'lastReservoirRefresh', now,\n 'version', '2.11.1'\n )\n end\n end\n\n -- 2.14.0\n if version_digits[2] < 14 then\n local old_running_key = 'b_'..id..'_running'\n local old_executing_key = 'b_'..id..'_executing'\n\n if redis.call('exists', old_running_key) == 1 then\n redis.call('rename', old_running_key, job_weights_key)\n end\n if redis.call('exists', old_executing_key) == 1 then\n redis.call('rename', old_executing_key, job_expirations_key)\n end\n redis.call('hset', settings_key, 'version', '2.14.0')\n end\n\n -- 2.15.2\n if version_digits[2] < 15 and version_digits[3] < 2 then\n redis.call('hsetnx', settings_key, 'capacityPriorityCounter', 0)\n redis.call('hset', settings_key, 'version', '2.15.2')\n end\n\n end\n\n process_tick(now, false)\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(0, 0, groupTimeout)\n\nreturn {}\n", | ||
"process_tick.lua": "local process_tick = function (now, always_publish)\n\n local compute_capacity = function (maxConcurrent, running, reservoir)\n if maxConcurrent ~= nil and reservoir ~= nil then\n return math.min((maxConcurrent - running), reservoir)\n elseif maxConcurrent ~= nil then\n return maxConcurrent - running\n elseif reservoir ~= nil then\n return reservoir\n else\n return nil\n end\n end\n\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'maxConcurrent',\n 'running',\n 'reservoir',\n 'reservoirRefreshInterval',\n 'reservoirRefreshAmount',\n 'lastReservoirRefresh',\n 'capacityPriorityCounter'\n )\n local id = settings[1]\n local maxConcurrent = tonumber(settings[2])\n local running = tonumber(settings[3])\n local reservoir = tonumber(settings[4])\n local reservoirRefreshInterval = tonumber(settings[5])\n local reservoirRefreshAmount = tonumber(settings[6])\n local lastReservoirRefresh = tonumber(settings[7])\n local capacityPriorityCounter = tonumber(settings[8])\n\n local initial_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n --\n -- Process 'running' changes\n --\n local expired = redis.call('zrangebyscore', job_expirations_key, '-inf', '('..now)\n\n if #expired > 0 then\n redis.call('zremrangebyscore', job_expirations_key, '-inf', '('..now)\n\n local flush_batch = function (batch, acc)\n local weights = redis.call('hmget', job_weights_key, unpack(batch))\n redis.call('hdel', job_weights_key, unpack(batch))\n local clients = redis.call('hmget', job_clients_key, unpack(batch))\n redis.call('hdel', job_clients_key, unpack(batch))\n\n -- Calculate sum of removed weights\n for i = 1, #weights do\n acc['total'] = acc['total'] + (tonumber(weights[i]) or 0)\n end\n\n -- Calculate sum of removed weights by client\n local client_weights = {}\n for i = 1, #clients do\n if weights[i] ~= nil then\n acc['client_weights'][clients[i]] = (acc['client_weights'][clients[i]] or 0) + (tonumber(weights[i]) or 0)\n end\n end\n end\n\n local acc = {\n ['total'] = 0,\n ['client_weights'] = {}\n }\n local batch_size = 1000\n\n -- Compute changes to Zsets and apply changes to Hashes\n for i = 1, #expired, batch_size do\n local batch = {}\n for j = i, math.min(i + batch_size - 1, #expired) do\n table.insert(batch, expired[j])\n end\n\n flush_batch(batch, acc)\n end\n\n -- Apply changes to Zsets\n if acc['total'] > 0 then\n redis.call('hincrby', settings_key, 'done', acc['total'])\n running = tonumber(redis.call('hincrby', settings_key, 'running', -acc['total']))\n end\n\n for client, weight in pairs(acc['client_weights']) do\n redis.call('zincrby', client_running_key, -weight, client)\n end\n end\n\n --\n -- Process 'reservoir' changes\n --\n local reservoirRefreshActive = reservoirRefreshInterval ~= nil and reservoirRefreshAmount ~= nil\n if reservoirRefreshActive and now >= lastReservoirRefresh + reservoirRefreshInterval then\n reservoir = reservoirRefreshAmount\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'lastReservoirRefresh', now\n )\n end\n\n --\n -- Broadcast capacity changes\n --\n local final_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n if always_publish or (initial_capacity ~= nil and final_capacity == nil) then\n -- always_publish or was not unlimited, now unlimited\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n\n elseif initial_capacity ~= nil and final_capacity ~= nil and final_capacity > initial_capacity then\n -- capacity was increased\n -- send the capacity message to the limiter having the lowest number of running jobs\n -- the tiebreaker is the limiter having not registered a job in the longest time\n\n local lowest_concurrency_value = nil\n local lowest_concurrency_clients = {}\n local lowest_concurrency_last_registered = {}\n local client_concurrencies = redis.call('zrange', client_running_key, 0, -1, 'withscores')\n local valid_clients = redis.call('zrangebyscore', client_last_seen_key, (now - 10000), 'inf')\n local valid_clients_lookup = {}\n for i = 1, #valid_clients do\n valid_clients_lookup[valid_clients[i]] = true\n end\n\n for i = 1, #client_concurrencies, 2 do\n local client = client_concurrencies[i]\n local concurrency = tonumber(client_concurrencies[i+1])\n\n if (\n lowest_concurrency_value == nil or lowest_concurrency_value == concurrency\n ) and (\n valid_clients_lookup[client]\n ) and (\n tonumber(redis.call('hget', client_num_queued_key, client)) > 0\n ) then\n lowest_concurrency_value = concurrency\n table.insert(lowest_concurrency_clients, client)\n local last_registered = tonumber(redis.call('zscore', client_last_registered_key, client))\n table.insert(lowest_concurrency_last_registered, last_registered)\n end\n end\n\n if #lowest_concurrency_clients > 0 then\n local position = 1\n local earliest = lowest_concurrency_last_registered[1]\n\n for i,v in ipairs(lowest_concurrency_last_registered) do\n if v < earliest then\n position = i\n earliest = v\n end\n end\n\n local next_client = lowest_concurrency_clients[position]\n redis.call('publish', 'b_'..id,\n 'capacity-priority:'..(final_capacity or '')..\n ':'..next_client..\n ':'..capacityPriorityCounter\n )\n redis.call('hincrby', settings_key, 'capacityPriorityCounter', '1')\n else\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n end\n end\n\n return {\n ['capacity'] = final_capacity,\n ['running'] = running,\n ['reservoir'] = reservoir\n }\nend\n", | ||
"queued.lua": "local valid_clients = redis.call('zrangebyscore', client_last_seen_key, (now - 10000), 'inf')\nlocal client_queued = redis.call('hmget', client_num_queued_key, unpack(valid_clients))\n\nlocal sum = 0\nfor i = 1, #client_queued do\n sum = sum + tonumber(client_queued[i])\nend\n\nreturn sum\n", | ||
"init.lua": "local clear = tonumber(ARGV[num_static_argv + 1])\nlocal limiter_version = ARGV[num_static_argv + 2]\nlocal num_local_argv = num_static_argv + 2\n\nif clear == 1 then\n redis.call('del', unpack(KEYS))\nend\n\nif redis.call('exists', settings_key) == 0 then\n -- Create\n local args = {'hmset', settings_key}\n\n for i = num_local_argv + 1, #ARGV do\n table.insert(args, ARGV[i])\n end\n\n redis.call(unpack(args))\n redis.call('hmset', settings_key,\n 'nextRequest', now,\n 'lastReservoirRefresh', now,\n 'running', 0,\n 'done', 0,\n 'unblockTime', 0,\n 'capacityPriorityCounter', 0\n )\n\nelse\n -- Apply migrations\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'version'\n )\n local id = settings[1]\n local current_version = settings[2]\n\n if current_version ~= limiter_version then\n local version_digits = {}\n for k, v in string.gmatch(current_version, \"([^.]+)\") do\n table.insert(version_digits, tonumber(k))\n end\n\n -- 2.10.0\n if version_digits[2] < 10 then\n redis.call('hsetnx', settings_key, 'reservoirRefreshInterval', '')\n redis.call('hsetnx', settings_key, 'reservoirRefreshAmount', '')\n redis.call('hsetnx', settings_key, 'lastReservoirRefresh', '')\n redis.call('hsetnx', settings_key, 'done', 0)\n redis.call('hset', settings_key, 'version', '2.10.0')\n end\n\n -- 2.11.1\n if version_digits[2] < 11 or (version_digits[2] == 11 and version_digits[3] < 1) then\n if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then\n redis.call('hmset', settings_key,\n 'lastReservoirRefresh', now,\n 'version', '2.11.1'\n )\n end\n end\n\n -- 2.14.0\n if version_digits[2] < 14 then\n local old_running_key = 'b_'..id..'_running'\n local old_executing_key = 'b_'..id..'_executing'\n\n if redis.call('exists', old_running_key) == 1 then\n redis.call('rename', old_running_key, job_weights_key)\n end\n if redis.call('exists', old_executing_key) == 1 then\n redis.call('rename', old_executing_key, job_expirations_key)\n end\n redis.call('hset', settings_key, 'version', '2.14.0')\n end\n\n -- 2.15.2\n if version_digits[2] < 15 or (version_digits[2] == 15 and version_digits[3] < 2) then\n redis.call('hsetnx', settings_key, 'capacityPriorityCounter', 0)\n redis.call('hset', settings_key, 'version', '2.15.2')\n end\n\n -- 2.17.0\n if version_digits[2] < 17 then\n redis.call('hsetnx', settings_key, 'clientTimeout', 10000)\n redis.call('hset', settings_key, 'version', '2.17.0')\n end\n\n end\n\n process_tick(now, false)\nend\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(0, 0, groupTimeout)\n\nreturn {}\n", | ||
"process_tick.lua": "local process_tick = function (now, always_publish)\n\n local compute_capacity = function (maxConcurrent, running, reservoir)\n if maxConcurrent ~= nil and reservoir ~= nil then\n return math.min((maxConcurrent - running), reservoir)\n elseif maxConcurrent ~= nil then\n return maxConcurrent - running\n elseif reservoir ~= nil then\n return reservoir\n else\n return nil\n end\n end\n\n local settings = redis.call('hmget', settings_key,\n 'id',\n 'maxConcurrent',\n 'running',\n 'reservoir',\n 'reservoirRefreshInterval',\n 'reservoirRefreshAmount',\n 'lastReservoirRefresh',\n 'capacityPriorityCounter',\n 'clientTimeout'\n )\n local id = settings[1]\n local maxConcurrent = tonumber(settings[2])\n local running = tonumber(settings[3])\n local reservoir = tonumber(settings[4])\n local reservoirRefreshInterval = tonumber(settings[5])\n local reservoirRefreshAmount = tonumber(settings[6])\n local lastReservoirRefresh = tonumber(settings[7])\n local capacityPriorityCounter = tonumber(settings[8])\n local clientTimeout = tonumber(settings[9])\n\n local initial_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n --\n -- Process 'running' changes\n --\n local expired = redis.call('zrangebyscore', job_expirations_key, '-inf', '('..now)\n\n if #expired > 0 then\n redis.call('zremrangebyscore', job_expirations_key, '-inf', '('..now)\n\n local flush_batch = function (batch, acc)\n local weights = redis.call('hmget', job_weights_key, unpack(batch))\n redis.call('hdel', job_weights_key, unpack(batch))\n local clients = redis.call('hmget', job_clients_key, unpack(batch))\n redis.call('hdel', job_clients_key, unpack(batch))\n\n -- Calculate sum of removed weights\n for i = 1, #weights do\n acc['total'] = acc['total'] + (tonumber(weights[i]) or 0)\n end\n\n -- Calculate sum of removed weights by client\n local client_weights = {}\n for i = 1, #clients do\n local removed = tonumber(weights[i]) or 0\n if removed > 0 then\n acc['client_weights'][clients[i]] = (acc['client_weights'][clients[i]] or 0) + removed\n end\n end\n end\n\n local acc = {\n ['total'] = 0,\n ['client_weights'] = {}\n }\n local batch_size = 1000\n\n -- Compute changes to Zsets and apply changes to Hashes\n for i = 1, #expired, batch_size do\n local batch = {}\n for j = i, math.min(i + batch_size - 1, #expired) do\n table.insert(batch, expired[j])\n end\n\n flush_batch(batch, acc)\n end\n\n -- Apply changes to Zsets\n if acc['total'] > 0 then\n redis.call('hincrby', settings_key, 'done', acc['total'])\n running = tonumber(redis.call('hincrby', settings_key, 'running', -acc['total']))\n end\n\n for client, weight in pairs(acc['client_weights']) do\n redis.call('zincrby', client_running_key, -weight, client)\n end\n end\n\n --\n -- Process 'reservoir' changes\n --\n local reservoirRefreshActive = reservoirRefreshInterval ~= nil and reservoirRefreshAmount ~= nil\n if reservoirRefreshActive and now >= lastReservoirRefresh + reservoirRefreshInterval then\n reservoir = reservoirRefreshAmount\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'lastReservoirRefresh', now\n )\n end\n\n --\n -- Clear unresponsive clients\n --\n local unresponsive = redis.call('zrangebyscore', client_last_seen_key, '-inf', (now - clientTimeout))\n if #unresponsive > 0 then\n local terminated_clients = {}\n for i = 1, #unresponsive do\n if tonumber(redis.call('zscore', client_running_key, unresponsive[i])) == 0 then\n table.insert(terminated_clients, unresponsive[i])\n end\n end\n if #terminated_clients > 0 then\n redis.call('zrem', client_running_key, unpack(terminated_clients))\n redis.call('hdel', client_num_queued_key, unpack(terminated_clients))\n redis.call('zrem', client_last_registered_key, unpack(terminated_clients))\n redis.call('zrem', client_last_seen_key, unpack(terminated_clients))\n end\n end\n\n --\n -- Broadcast capacity changes\n --\n local final_capacity = compute_capacity(maxConcurrent, running, reservoir)\n\n if always_publish or (initial_capacity ~= nil and final_capacity == nil) then\n -- always_publish or was not unlimited, now unlimited\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n\n elseif initial_capacity ~= nil and final_capacity ~= nil and final_capacity > initial_capacity then\n -- capacity was increased\n -- send the capacity message to the limiter having the lowest number of running jobs\n -- the tiebreaker is the limiter having not registered a job in the longest time\n\n local lowest_concurrency_value = nil\n local lowest_concurrency_clients = {}\n local lowest_concurrency_last_registered = {}\n local client_concurrencies = redis.call('zrange', client_running_key, 0, -1, 'withscores')\n\n for i = 1, #client_concurrencies, 2 do\n local client = client_concurrencies[i]\n local concurrency = tonumber(client_concurrencies[i+1])\n\n if (\n lowest_concurrency_value == nil or lowest_concurrency_value == concurrency\n ) and (\n tonumber(redis.call('hget', client_num_queued_key, client)) > 0\n ) then\n lowest_concurrency_value = concurrency\n table.insert(lowest_concurrency_clients, client)\n local last_registered = tonumber(redis.call('zscore', client_last_registered_key, client))\n table.insert(lowest_concurrency_last_registered, last_registered)\n end\n end\n\n if #lowest_concurrency_clients > 0 then\n local position = 1\n local earliest = lowest_concurrency_last_registered[1]\n\n for i,v in ipairs(lowest_concurrency_last_registered) do\n if v < earliest then\n position = i\n earliest = v\n end\n end\n\n local next_client = lowest_concurrency_clients[position]\n redis.call('publish', 'b_'..id,\n 'capacity-priority:'..(final_capacity or '')..\n ':'..next_client..\n ':'..capacityPriorityCounter\n )\n redis.call('hincrby', settings_key, 'capacityPriorityCounter', '1')\n else\n redis.call('publish', 'b_'..id, 'capacity:'..(final_capacity or ''))\n end\n end\n\n return {\n ['capacity'] = final_capacity,\n ['running'] = running,\n ['reservoir'] = reservoir\n }\nend\n", | ||
"queued.lua": "local clientTimeout = tonumber(redis.call('hget', settings_key, 'clientTimeout'))\nlocal valid_clients = redis.call('zrangebyscore', client_last_seen_key, (now - clientTimeout), 'inf')\nlocal client_queued = redis.call('hmget', client_num_queued_key, unpack(valid_clients))\n\nlocal sum = 0\nfor i = 1, #client_queued do\n sum = sum + tonumber(client_queued[i])\nend\n\nreturn sum\n", | ||
"refresh_expiration.lua": "local refresh_expiration = function (now, nextRequest, groupTimeout)\n\n if groupTimeout ~= nil then\n local ttl = (nextRequest + groupTimeout) - now\n\n for i = 1, #KEYS do\n redis.call('pexpire', KEYS[i], ttl)\n end\n end\n\nend\n", | ||
"refs.lua": "local settings_key = KEYS[1]\nlocal job_weights_key = KEYS[2]\nlocal job_expirations_key = KEYS[3]\nlocal job_clients_key = KEYS[4]\nlocal client_running_key = KEYS[5]\nlocal client_num_queued_key = KEYS[6]\nlocal client_last_registered_key = KEYS[7]\nlocal client_last_seen_key = KEYS[8]\n\nlocal now = tonumber(ARGV[1])\nlocal client = ARGV[2]\nlocal queued = ARGV[3]\n\nlocal num_static_argv = 3\n", | ||
"refs.lua": "local settings_key = KEYS[1]\nlocal job_weights_key = KEYS[2]\nlocal job_expirations_key = KEYS[3]\nlocal job_clients_key = KEYS[4]\nlocal client_running_key = KEYS[5]\nlocal client_num_queued_key = KEYS[6]\nlocal client_last_registered_key = KEYS[7]\nlocal client_last_seen_key = KEYS[8]\n\nlocal now = tonumber(ARGV[1])\nlocal client = ARGV[2]\n\nlocal num_static_argv = 2\n", | ||
"register.lua": "local index = ARGV[num_static_argv + 1]\nlocal weight = tonumber(ARGV[num_static_argv + 2])\nlocal expiration = tonumber(ARGV[num_static_argv + 3])\n\nlocal state = process_tick(now, false)\nlocal capacity = state['capacity']\nlocal reservoir = state['reservoir']\n\nlocal settings = redis.call('hmget', settings_key,\n 'nextRequest',\n 'minTime',\n 'groupTimeout'\n)\nlocal nextRequest = tonumber(settings[1])\nlocal minTime = tonumber(settings[2])\nlocal groupTimeout = tonumber(settings[3])\n\nif conditions_check(capacity, weight) then\n\n redis.call('hincrby', settings_key, 'running', weight)\n redis.call('hset', job_weights_key, index, weight)\n if expiration ~= nil then\n redis.call('zadd', job_expirations_key, now + expiration, index)\n end\n redis.call('hset', job_clients_key, index, client)\n redis.call('zincrby', client_running_key, weight, client)\n redis.call('hincrby', client_num_queued_key, client, -1)\n redis.call('zadd', client_last_registered_key, now, client)\n\n local wait = math.max(nextRequest - now, 0)\n local newNextRequest = now + wait + minTime\n\n if reservoir == nil then\n redis.call('hset', settings_key,\n 'nextRequest', newNextRequest\n )\n else\n reservoir = reservoir - weight\n redis.call('hmset', settings_key,\n 'reservoir', reservoir,\n 'nextRequest', newNextRequest\n )\n end\n\n refresh_expiration(now, newNextRequest, groupTimeout)\n\n return {true, wait, reservoir}\n\nelse\n return {false}\nend\n", | ||
"register_client.lua": "local queued = tonumber(ARGV[num_static_argv + 1])\n\n-- Could have been re-registered concurrently\nif not redis.call('zscore', client_last_seen_key, client) then\n redis.call('zadd', client_running_key, 0, client)\n redis.call('hset', client_num_queued_key, client, queued)\n redis.call('zadd', client_last_registered_key, 0, client)\nend\n\nredis.call('zadd', client_last_seen_key, now, client)\n\nreturn {}\n", | ||
"running.lua": "return process_tick(now, false)['running']\n", | ||
"submit.lua": "local queueLength = tonumber(ARGV[num_static_argv + 1])\nlocal weight = tonumber(ARGV[num_static_argv + 2])\n\nlocal capacity = process_tick(now, false)['capacity']\n\nlocal settings = redis.call('hmget', settings_key,\n 'id',\n 'maxConcurrent',\n 'highWater',\n 'nextRequest',\n 'strategy',\n 'unblockTime',\n 'penalty',\n 'minTime',\n 'groupTimeout'\n)\nlocal id = settings[1]\nlocal maxConcurrent = tonumber(settings[2])\nlocal highWater = tonumber(settings[3])\nlocal nextRequest = tonumber(settings[4])\nlocal strategy = tonumber(settings[5])\nlocal unblockTime = tonumber(settings[6])\nlocal penalty = tonumber(settings[7])\nlocal minTime = tonumber(settings[8])\nlocal groupTimeout = tonumber(settings[9])\n\nif maxConcurrent ~= nil and weight > maxConcurrent then\n return redis.error_reply('OVERWEIGHT:'..weight..':'..maxConcurrent)\nend\n\nlocal reachedHWM = (highWater ~= nil and queueLength == highWater\n and not (\n conditions_check(capacity, weight)\n and nextRequest - now <= 0\n )\n)\n\nlocal blocked = strategy == 3 and (reachedHWM or unblockTime >= now)\n\nif blocked then\n local computedPenalty = penalty\n if computedPenalty == nil then\n if minTime == 0 then\n computedPenalty = 5000\n else\n computedPenalty = 15 * minTime\n end\n end\n\n local newNextRequest = now + computedPenalty + minTime\n\n redis.call('hmset', settings_key,\n 'unblockTime', now + computedPenalty,\n 'nextRequest', newNextRequest\n )\n\n local clients_queued_reset = redis.call('hkeys', client_num_queued_key)\n local queued_reset = {}\n for i = 1, #clients_queued_reset do\n table.insert(queued_reset, clients_queued_reset[i])\n table.insert(queued_reset, 0)\n end\n redis.call('hmset', client_num_queued_key, unpack(queued_reset))\n\n redis.call('publish', 'b_'..id, 'blocked:')\n\n refresh_expiration(now, newNextRequest, groupTimeout)\nend\n\nif not blocked and not reachedHWM then\n redis.call('hincrby', client_num_queued_key, client, 1)\nend\n\nreturn {reachedHWM, blocked, strategy}\n", | ||
"update_settings.lua": "local args = {'hmset', settings_key}\n\nfor i = num_static_argv + 1, #ARGV do\n table.insert(args, ARGV[i])\nend\n\nredis.call(unpack(args))\n\nprocess_tick(now, true)\n\nlocal groupTimeout = tonumber(redis.call('hget', settings_key, 'groupTimeout'))\nrefresh_expiration(0, 0, groupTimeout)\n\nreturn {}\n", | ||
"validate_keys.lua": "if not (redis.call('exists', settings_key) == 1) then\n return redis.error_reply('SETTINGS_KEY_NOT_FOUND')\nend\n\nif not redis.call('zscore', client_running_key, client) then\n -- Register new client\n redis.call('zadd', client_running_key, 0, client)\n redis.call('hset', client_num_queued_key, client, queued)\n redis.call('zadd', client_last_registered_key, 0, client)\nend\n\nredis.call('zadd', client_last_seen_key, now, client)\n" | ||
"validate_client.lua": "if not redis.call('zscore', client_last_seen_key, client) then\n return redis.error_reply('UNKNOWN_CLIENT')\nend\n\nredis.call('zadd', client_last_seen_key, now, client)\n", | ||
"validate_keys.lua": "if not (redis.call('exists', settings_key) == 1) then\n return redis.error_reply('SETTINGS_KEY_NOT_FOUND')\nend\n" | ||
} |
@@ -52,3 +52,3 @@ "use strict"; | ||
}).then(() => { | ||
return this.runScript("heartbeat", []); | ||
return this.runScript("register_client", [this.instance.queued()]); | ||
}).then(() => { | ||
@@ -156,3 +156,3 @@ var base; | ||
return _asyncToGenerator(function* () { | ||
if (!(name === "init" || name === "heartbeat")) { | ||
if (!(name === "init" || name === "register_client")) { | ||
yield _this3.ready; | ||
@@ -163,3 +163,3 @@ } | ||
var all_args, arr; | ||
all_args = [Date.now(), _this3.clientId, _this3.instance.queued()].concat(args); | ||
all_args = [Date.now(), _this3.clientId].concat(args); | ||
@@ -177,8 +177,14 @@ _this3.instance.Events.trigger("debug", `Calling Redis script: ${name}.lua`, all_args); | ||
}).catch(e => { | ||
if (e.message === "SETTINGS_KEY_NOT_FOUND" && name !== "heartbeat") { | ||
return _this3.runScript("init", _this3.prepareInitSettings(false)).then(() => { | ||
if (e.message === "SETTINGS_KEY_NOT_FOUND") { | ||
if (name === "heartbeat") { | ||
return _this3.Promise.resolve(); | ||
} else { | ||
return _this3.runScript("init", _this3.prepareInitSettings(false)).then(() => { | ||
return _this3.runScript(name, args); | ||
}); | ||
} | ||
} else if (e.message === "UNKNOWN_CLIENT") { | ||
return _this3.runScript("register_client", [_this3.instance.queued()]).then(() => { | ||
return _this3.runScript(name, args); | ||
}); | ||
} else if (name === "heartbeat") { | ||
return _this3.Promise.resolve(); | ||
} else { | ||
@@ -220,3 +226,4 @@ return _this3.Promise.reject(e); | ||
version: this.instance.version, | ||
groupTimeout: this.timeout | ||
groupTimeout: this.timeout, | ||
clientTimeout: this.clientTimeout | ||
})); | ||
@@ -223,0 +230,0 @@ args.unshift(clear ? 1 : 0, this.instance.version); |
@@ -8,2 +8,3 @@ "use strict"; | ||
validate_keys: lua["validate_keys.lua"], | ||
validate_client: lua["validate_client.lua"], | ||
refresh_expiration: lua["refresh_expiration.lua"], | ||
@@ -71,6 +72,12 @@ process_tick: lua["process_tick.lua"], | ||
}, | ||
blacklist_client: { | ||
register_client: { | ||
keys: exports.allKeys, | ||
headers: ["validate_keys"], | ||
refresh_expiration: false, | ||
code: lua["register_client.lua"] | ||
}, | ||
blacklist_client: { | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "validate_client"], | ||
refresh_expiration: false, | ||
code: lua["blacklist_client.lua"] | ||
@@ -80,3 +87,3 @@ }, | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick"], | ||
headers: ["validate_keys", "validate_client", "process_tick"], | ||
refresh_expiration: false, | ||
@@ -87,3 +94,3 @@ code: lua["heartbeat.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick"], | ||
headers: ["validate_keys", "validate_client", "process_tick"], | ||
refresh_expiration: true, | ||
@@ -94,3 +101,3 @@ code: lua["update_settings.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick"], | ||
headers: ["validate_keys", "validate_client", "process_tick"], | ||
refresh_expiration: false, | ||
@@ -101,3 +108,3 @@ code: lua["running.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys"], | ||
headers: ["validate_keys", "validate_client"], | ||
refresh_expiration: false, | ||
@@ -108,3 +115,3 @@ code: lua["queued.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick"], | ||
headers: ["validate_keys", "validate_client", "process_tick"], | ||
refresh_expiration: false, | ||
@@ -115,3 +122,3 @@ code: lua["done.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick", "conditions_check"], | ||
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"], | ||
refresh_expiration: false, | ||
@@ -122,3 +129,3 @@ code: lua["check.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick", "conditions_check"], | ||
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"], | ||
refresh_expiration: true, | ||
@@ -129,3 +136,3 @@ code: lua["submit.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick", "conditions_check"], | ||
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"], | ||
refresh_expiration: true, | ||
@@ -136,3 +143,3 @@ code: lua["register.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick"], | ||
headers: ["validate_keys", "validate_client", "process_tick"], | ||
refresh_expiration: true, | ||
@@ -143,3 +150,3 @@ code: lua["free.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick"], | ||
headers: ["validate_keys", "validate_client", "process_tick"], | ||
refresh_expiration: false, | ||
@@ -150,3 +157,3 @@ code: lua["current_reservoir.lua"] | ||
keys: exports.allKeys, | ||
headers: ["validate_keys", "process_tick"], | ||
headers: ["validate_keys", "validate_client", "process_tick"], | ||
refresh_expiration: true, | ||
@@ -153,0 +160,0 @@ code: lua["increment_reservoir.lua"] |
@@ -1,1 +0,1 @@ | ||
{"version":"2.16.2"} | ||
{"version":"2.17.0"} |
14
light.js
@@ -622,3 +622,3 @@ /** | ||
var version = "2.16.2"; | ||
var version = "2.17.0"; | ||
var version$1 = { | ||
@@ -1265,5 +1265,12 @@ version: version | ||
wrapped = (...args) => { | ||
var cb, ref, returned; | ||
var cb, e, ref, returned; | ||
ref = args, [...args] = ref, [cb] = splice$1.call(args, -1); | ||
returned = task(...args); | ||
returned = (function() { | ||
try { | ||
return task(...args); | ||
} catch (error1) { | ||
e = error1; | ||
return this.Promise.reject(e); | ||
} | ||
}).call(this); | ||
return (!(((returned != null ? returned.then : void 0) != null) && typeof returned.then === "function") ? this.Promise.resolve(returned) : returned).then(function(...args) { | ||
@@ -1362,2 +1369,3 @@ return cb(null, ...args); | ||
heartbeatInterval: 5000, | ||
clientTimeout: 10000, | ||
clientOptions: {}, | ||
@@ -1364,0 +1372,0 @@ clusterNodes: null, |
{ | ||
"name": "bottleneck", | ||
"version": "2.16.2", | ||
"version": "2.17.0", | ||
"description": "Distributed task scheduler and rate limiter", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -51,3 +51,3 @@ # bottleneck | ||
```js | ||
const Bottleneck = require("bottleneck"); | ||
import Bottleneck from "bottleneck"; | ||
@@ -330,3 +330,3 @@ // Note: To support older browsers and Node <6.0, you must import the ES5 bundle instead. | ||
1. **Received**. You new job has been added to your limiter. Bottleneck needs to check whether if can be accepted into the queue. | ||
1. **Received**. You new job has been added to your limiter. Bottleneck needs to check whether it can be accepted into the queue. | ||
2. **Queued**. Bottleneck has accepted your job, but it can not tell at what exact timestamp it will run yet, because it is dependent on previous jobs. | ||
@@ -333,0 +333,0 @@ 3. **Running**. Your job is not in the queue anymore, it will be executed after a delay that was computed according to your `minTime` setting. |
@@ -238,3 +238,3 @@ var makeTest = require('./context') | ||
runCommand(c.limiter, 'hset', [settings_key, 'version', '2.8.0']), | ||
runCommand(c.limiter, 'hdel', [settings_key, 'done', 'capacityPriorityCounter']), | ||
runCommand(c.limiter, 'hdel', [settings_key, 'done', 'capacityPriorityCounter', 'clientTimeout']), | ||
runCommand(c.limiter, 'hset', [settings_key, 'lastReservoirRefresh', '']) | ||
@@ -258,2 +258,4 @@ ]) | ||
'capacityPriorityCounter', | ||
'clientTimeout', | ||
// Add new values here, before lastReservoirRefresh | ||
'lastReservoirRefresh' | ||
@@ -265,3 +267,3 @@ ]) | ||
assert(parseInt(lastReservoirRefresh) > Date.now() - 500) | ||
c.mustEqual(values.slice(0, values.length - 1), ['2.15.2', '0', '', '', '0']) | ||
c.mustEqual(values.slice(0, values.length - 1), ['2.17.0', '0', '', '', '0', '10000']) | ||
}) | ||
@@ -530,2 +532,137 @@ .then(function () { | ||
it('Should clear unresponsive clients', async function () { | ||
c = makeTest({ | ||
id: 'unresponsive', | ||
maxConcurrent: 1, | ||
timeout: 1000, | ||
clientTimeout: 100, | ||
heartbeat: 50 | ||
}) | ||
const limiter2 = new Bottleneck({ | ||
id: 'unresponsive', | ||
datastore: process.env.DATASTORE | ||
}) | ||
await Promise.all([c.limiter.running(), limiter2.running()]) | ||
const client_running_key = limiterKeys(limiter2)[4] | ||
const client_num_queued_key = limiterKeys(limiter2)[5] | ||
const client_last_registered_key = limiterKeys(limiter2)[6] | ||
const client_last_seen_key = limiterKeys(limiter2)[7] | ||
const numClients = () => Promise.all([ | ||
runCommand(c.limiter, 'zcard', [client_running_key]), | ||
runCommand(c.limiter, 'hlen', [client_num_queued_key]), | ||
runCommand(c.limiter, 'zcard', [client_last_registered_key]), | ||
runCommand(c.limiter, 'zcard', [client_last_seen_key]) | ||
]) | ||
c.mustEqual(await numClients(), [2, 2, 2, 2]) | ||
await limiter2.disconnect(false) | ||
await c.wait(150) | ||
await c.limiter.running() | ||
c.mustEqual(await numClients(), [1, 1, 1, 1]) | ||
}) | ||
it('Should not clear unresponsive clients with unexpired running jobs', async function () { | ||
c = makeTest({ | ||
id: 'unresponsive-unexpired', | ||
maxConcurrent: 1, | ||
timeout: 1000, | ||
clientTimeout: 200, | ||
heartbeat: 2000 | ||
}) | ||
const limiter2 = new Bottleneck({ | ||
id: 'unresponsive-unexpired', | ||
datastore: process.env.DATASTORE | ||
}) | ||
await c.limiter.ready() | ||
await limiter2.ready() | ||
const client_running_key = limiterKeys(limiter2)[4] | ||
const client_num_queued_key = limiterKeys(limiter2)[5] | ||
const client_last_registered_key = limiterKeys(limiter2)[6] | ||
const client_last_seen_key = limiterKeys(limiter2)[7] | ||
const numClients = () => Promise.all([ | ||
runCommand(limiter2, 'zcard', [client_running_key]), | ||
runCommand(limiter2, 'hlen', [client_num_queued_key]), | ||
runCommand(limiter2, 'zcard', [client_last_registered_key]), | ||
runCommand(limiter2, 'zcard', [client_last_seen_key]) | ||
]) | ||
const job = c.limiter.schedule(c.slowPromise, 500, null, 1) | ||
await c.wait(300) | ||
// running() triggers process_tick and that will attempt to remove client 1 | ||
// but it shouldn't do it because it has a running job | ||
c.mustEqual(await limiter2.running(), 1) | ||
c.mustEqual(await numClients(), [2, 2, 2, 2]) | ||
await job | ||
c.mustEqual(await limiter2.running(), 0) | ||
await limiter2.disconnect(false) | ||
}) | ||
it('Should clear unresponsive clients after last jobs are expired', async function () { | ||
c = makeTest({ | ||
id: 'unresponsive-expired', | ||
maxConcurrent: 1, | ||
timeout: 1000, | ||
clientTimeout: 200, | ||
heartbeat: 2000 | ||
}) | ||
const limiter2 = new Bottleneck({ | ||
id: 'unresponsive-expired', | ||
datastore: process.env.DATASTORE | ||
}) | ||
await c.limiter.ready() | ||
await limiter2.ready() | ||
const client_running_key = limiterKeys(limiter2)[4] | ||
const client_num_queued_key = limiterKeys(limiter2)[5] | ||
const client_last_registered_key = limiterKeys(limiter2)[6] | ||
const client_last_seen_key = limiterKeys(limiter2)[7] | ||
const numClients = () => Promise.all([ | ||
runCommand(limiter2, 'zcard', [client_running_key]), | ||
runCommand(limiter2, 'hlen', [client_num_queued_key]), | ||
runCommand(limiter2, 'zcard', [client_last_registered_key]), | ||
runCommand(limiter2, 'zcard', [client_last_seen_key]) | ||
]) | ||
const job = c.limiter.schedule({ expiration: 250 }, c.slowPromise, 300, null, 1) | ||
await c.wait(100) // wait for it to register | ||
c.mustEqual(await c.limiter.running(), 1) | ||
c.mustEqual(await numClients(), [2,2,2,2]) | ||
let dropped = false | ||
try { | ||
await job | ||
} catch (e) { | ||
if (e.message === 'This job timed out after 250 ms.') { | ||
dropped = true | ||
} else { | ||
throw e | ||
} | ||
} | ||
assert(dropped) | ||
await c.wait(200) | ||
c.mustEqual(await limiter2.running(), 0) | ||
c.mustEqual(await numClients(), [1,1,1,1]) | ||
await limiter2.disconnect(false) | ||
}) | ||
it('Should use shared settings', function () { | ||
@@ -1363,3 +1500,2 @@ c = makeTest({ maxConcurrent: 2 }) | ||
await limiter1.disconnect(false) | ||
@@ -1366,0 +1502,0 @@ await limiter2.disconnect(false) |
@@ -83,2 +83,14 @@ var makeTest = require('./context') | ||
it('Should automatically wrap an exception in a rejected promise - schedule()', function () { | ||
c = makeTest({maxConcurrent: 1, minTime: 100}) | ||
return c.limiter.schedule(() => { | ||
throw new Error('I will reject') | ||
}) | ||
.then(() => assert(false)) | ||
.catch(err => { | ||
assert(err.message === 'I will reject'); | ||
}) | ||
}) | ||
describe('Wrap', function () { | ||
@@ -102,2 +114,22 @@ it('Should wrap', function () { | ||
it('Should automatically wrap a returned value in a resolved promise', function () { | ||
c = makeTest({maxConcurrent: 1, minTime: 100}) | ||
fn = c.limiter.wrap(() => { return 7 }); | ||
return fn().then(result => { | ||
assert(result === 7); | ||
}) | ||
}) | ||
it('Should automatically wrap an exception in a rejected promise', function () { | ||
c = makeTest({maxConcurrent: 1, minTime: 100}) | ||
fn = c.limiter.wrap(() => { throw new Error('I will reject') }); | ||
return fn().then(() => assert(false)).catch(error => { | ||
assert(error.message === 'I will reject'); | ||
}) | ||
}) | ||
it('Should inherit the original target for wrapped methods', function () { | ||
@@ -104,0 +136,0 @@ c = makeTest({maxConcurrent: 1, minTime: 100}) |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
584663
92
11870
75