Skip to content

Commit

Permalink
Merge pull request #801 from nyaruka/lua_embed
Browse files Browse the repository at this point in the history
Move Redis lua scripts into embedded lua files for readability
  • Loading branch information
rowanseymour authored Nov 11, 2024
2 parents 03f274d + 24ddd50 commit cbd4a77
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 193 deletions.
14 changes: 14 additions & 0 deletions queue/lua/complete.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- KEYS: [QueueType, Queue]

-- decrement throttled if present
local throttled = tonumber(redis.call("zadd", KEYS[1] .. ":throttled", "XX", "CH", "INCR", -1, KEYS[2]))

-- if we didn't decrement anything, do so to our active set
if not throttled or throttled == 0 then
local active = tonumber(redis.call("zincrby", KEYS[1] .. ":active", -1, KEYS[2]))

-- reset to zero if we somehow go below
if active < 0 then
redis.call("zadd", KEYS[1] .. ":active", 0, KEYS[2])
end
end
25 changes: 25 additions & 0 deletions queue/lua/dethrottle.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- KEYS: [QueueType]

-- get all the keys from our throttle list
local throttled = redis.call("zrange", KEYS[1] .. ":throttled", 0, -1, "WITHSCORES")

-- add them to our active list
if next(throttled) then
local activeKey = KEYS[1] .. ":active"
for i=1,#throttled,2 do
redis.call("zincrby", activeKey, throttled[i+1], throttled[i])
end
redis.call("del", KEYS[1] .. ":throttled")
end

-- get all the keys in the future
local future = redis.call("zrange", KEYS[1] .. ":future", 0, -1, "WITHSCORES")

-- add them to our active list
if next(future) then
local activeKey = KEYS[1] .. ":active"
for i=1,#future,2 do
redis.call("zincrby", activeKey, future[i+1], future[i])
end
redis.call("del", KEYS[1] .. ":future")
end
124 changes: 124 additions & 0 deletions queue/lua/pop.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
-- KEYS: [EpochMS QueueType]

-- get the first key off our active list
local result = redis.call("zrange", KEYS[2] .. ":active", 0, 0, "WITHSCORES")
local queue = result[1]
local workers = result[2]

-- nothing? return nothing
if not queue then
return {"empty", ""}
end

-- figure out our max transaction per second
local delim = string.find(queue, "|")
local tps = 0
local tpsKey = ""

local queueName = ""

if delim then
queueName = string.sub(queue, string.len(KEYS[2])+2, delim-1)
tps = tonumber(string.sub(queue, delim+1))
end

if queueName then
local rateLimitKey = "rate_limit:" .. queueName
local rateLimitEngaged = redis.call("get", rateLimitKey)
if rateLimitEngaged then
redis.call("zincrby", KEYS[2] .. ":throttled", workers, queue)
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
end
end

-- if we have a tps, then check whether we exceed it
if tps > 0 then
tpsKey = queue .. ":tps:" .. math.floor(KEYS[1])
local curr = redis.call("get", tpsKey)

-- we are at or above our tps, move to our throttled queue
if curr and tonumber(curr) >= tps then
redis.call("zincrby", KEYS[2] .. ":throttled", workers, queue)
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
end
end

-- pop our next value out, first from our default queue
local resultQueue = queue .. "/1"
local result = redis.call("zrangebyscore", resultQueue, 0, "+inf", "WITHSCORES", "LIMIT", 0, 1)

-- keep track as to whether this result is in the future (and therefore ineligible)
local isFutureResult = result[1] and tonumber(result[2]) > tonumber(KEYS[1])

-- if we didn't find one, try again from our bulk queue
if not result[1] or isFutureResult then
-- check if we are rate limited for bulk queue
local rateLimitBulkKey = "rate_limit_bulk:" .. queueName
local rateLimitBulk = redis.call("get", rateLimitBulkKey)
if rateLimitBulk then
return {"retry", ""}
end

-- we are not pause check our bulk queue
local bulkQueue = queue .. "/0"
local bulkResult = redis.call("zrangebyscore", bulkQueue, 0, "+inf", "WITHSCORES", "LIMIT", 0, 1)

-- if we got a result
if bulkResult[1] then
-- if it is in the future, set ourselves as in the future
if tonumber(bulkResult[2]) > tonumber(KEYS[1]) then
isFutureResult = true

-- otherwise, this is a valid result
else
redis.call("echo", "found result")
isFutureResult = false
result = bulkResult
resultQueue = bulkQueue
end
end
end

-- if we found one
if result[1] and not isFutureResult then
-- then remove it from the queue
redis.call('zremrangebyrank', resultQueue, 0, 0)

-- and add a worker to this queue
redis.call("zincrby", KEYS[2] .. ":active", 1, queue)

-- parse it as JSON to get the first element out
local valueList = cjson.decode(result[1])
local popValue = cjson.encode(valueList[1])
table.remove(valueList, 1)

-- increment our tps for this second if we have a limit
if tps > 0 then
redis.call("incrby", tpsKey, popValue["tps_cost"] or 1)
redis.call("expire", tpsKey, 10)
end

-- encode it back if there is anything left
if table.getn(valueList) > 0 then
local remaining = cjson.encode(valueList)

-- schedule it in the future 3 seconds on our main queue
redis.call("zadd", queue .. "/1", tonumber(KEYS[1]) + 3, remaining)
redis.call("zincrby", KEYS[2] .. ":future", 0, queue)
end

return {queue, popValue}

-- otherwise, the queue only contains future results, remove from active and add to future, have the caller retry
elseif isFutureResult then
redis.call("zincrby", KEYS[2] .. ":future", 0, queue)
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}

-- otherwise, the queue is empty, remove it from active
else
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
end
26 changes: 26 additions & 0 deletions queue/lua/push.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- KEYS: [EpochMS, QueueType, QueueName, TPS, Priority, Value]

-- first push onto our specific queue
-- our queue name is built from the type, name and tps, usually something like: "msgs:uuid1-uuid2-uuid3-uuid4|tps"
local queueKey = KEYS[2] .. ":" .. KEYS[3] .. "|" .. KEYS[4]

-- our priority queue name also includes the priority of the message (we have one queue for default and one for bulk)
local priorityQueueKey = queueKey .. "/" .. KEYS[5]
redis.call("zadd", priorityQueueKey, KEYS[1], KEYS[6])

local tps = tonumber(KEYS[4])

-- if we have a TPS, check whether we are currently throttled
local curr = -1
if tps > 0 then
local tpsKey = queueKey .. ":tps:" .. math.floor(KEYS[1])
curr = tonumber(redis.call("get", tpsKey))
end

-- if we aren't then add to our active
if not curr or curr < tps then
redis.call("zincrby", KEYS[2] .. ":active", 0, queueKey)
return 1
else
return 0
end
Loading

0 comments on commit cbd4a77

Please sign in to comment.